]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_encode.py
Change stray "shares_of_happiness" to "servers_of_happiness"
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_encode.py
1 from zope.interface import implements
2 from twisted.trial import unittest
3 from twisted.internet import defer, reactor
4 from twisted.python.failure import Failure
5 from foolscap.api import fireEventually
6 from allmydata import hashtree, uri
7 from allmydata.immutable import encode, upload, download
8 from allmydata.util import hashutil
9 from allmydata.util.assertutil import _assert
10 from allmydata.util.consumer import MemoryConsumer
11 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
12      NotEnoughSharesError, IStorageBroker, UploadUnhappinessError
13 from allmydata.monitor import Monitor
14 import common_util as testutil
15
16 class LostPeerError(Exception):
17     pass
18
19 def flip_bit(good): # flips the last bit
20     return good[:-1] + chr(ord(good[-1]) ^ 0x01)
21
22 class FakeStorageBroker:
23     implements(IStorageBroker)
24
25 class FakeBucketReaderWriterProxy:
26     implements(IStorageBucketWriter, IStorageBucketReader)
27     # these are used for both reading and writing
28     def __init__(self, mode="good", peerid="peer"):
29         self.mode = mode
30         self.blocks = {}
31         self.plaintext_hashes = []
32         self.crypttext_hashes = []
33         self.block_hashes = None
34         self.share_hashes = None
35         self.closed = False
36         self.peerid = peerid
37
38     def get_peerid(self):
39         return self.peerid
40
41     def _start(self):
42         if self.mode == "lost-early":
43             f = Failure(LostPeerError("I went away early"))
44             return fireEventually(f)
45         return defer.succeed(self)
46
47     def put_header(self):
48         return self._start()
49
50     def put_block(self, segmentnum, data):
51         if self.mode == "lost-early":
52             f = Failure(LostPeerError("I went away early"))
53             return fireEventually(f)
54         def _try():
55             assert not self.closed
56             assert segmentnum not in self.blocks
57             if self.mode == "lost" and segmentnum >= 1:
58                 raise LostPeerError("I'm going away now")
59             self.blocks[segmentnum] = data
60         return defer.maybeDeferred(_try)
61
62     def put_plaintext_hashes(self, hashes):
63         def _try():
64             assert not self.closed
65             assert not self.plaintext_hashes
66             self.plaintext_hashes = hashes
67         return defer.maybeDeferred(_try)
68
69     def put_crypttext_hashes(self, hashes):
70         def _try():
71             assert not self.closed
72             assert not self.crypttext_hashes
73             self.crypttext_hashes = hashes
74         return defer.maybeDeferred(_try)
75
76     def put_block_hashes(self, blockhashes):
77         def _try():
78             assert not self.closed
79             assert self.block_hashes is None
80             self.block_hashes = blockhashes
81         return defer.maybeDeferred(_try)
82
83     def put_share_hashes(self, sharehashes):
84         def _try():
85             assert not self.closed
86             assert self.share_hashes is None
87             self.share_hashes = sharehashes
88         return defer.maybeDeferred(_try)
89
90     def put_uri_extension(self, uri_extension):
91         def _try():
92             assert not self.closed
93             self.uri_extension = uri_extension
94         return defer.maybeDeferred(_try)
95
96     def close(self):
97         def _try():
98             assert not self.closed
99             self.closed = True
100         return defer.maybeDeferred(_try)
101
102     def abort(self):
103         return defer.succeed(None)
104
105     def get_block_data(self, blocknum, blocksize, size):
106         d = self._start()
107         def _try(unused=None):
108             assert isinstance(blocknum, (int, long))
109             if self.mode == "bad block":
110                 return flip_bit(self.blocks[blocknum])
111             return self.blocks[blocknum]
112         d.addCallback(_try)
113         return d
114
115     def get_plaintext_hashes(self):
116         d = self._start()
117         def _try(unused=None):
118             hashes = self.plaintext_hashes[:]
119             return hashes
120         d.addCallback(_try)
121         return d
122
123     def get_crypttext_hashes(self):
124         d = self._start()
125         def _try(unused=None):
126             hashes = self.crypttext_hashes[:]
127             if self.mode == "bad crypttext hashroot":
128                 hashes[0] = flip_bit(hashes[0])
129             if self.mode == "bad crypttext hash":
130                 hashes[1] = flip_bit(hashes[1])
131             return hashes
132         d.addCallback(_try)
133         return d
134
135     def get_block_hashes(self, at_least_these=()):
136         d = self._start()
137         def _try(unused=None):
138             if self.mode == "bad blockhash":
139                 hashes = self.block_hashes[:]
140                 hashes[1] = flip_bit(hashes[1])
141                 return hashes
142             return self.block_hashes
143         d.addCallback(_try)
144         return d
145
146     def get_share_hashes(self, at_least_these=()):
147         d = self._start()
148         def _try(unused=None):
149             if self.mode == "bad sharehash":
150                 hashes = self.share_hashes[:]
151                 hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
152                 return hashes
153             if self.mode == "missing sharehash":
154                 # one sneaky attack would be to pretend we don't know our own
155                 # sharehash, which could manage to frame someone else.
156                 # download.py is supposed to guard against this case.
157                 return []
158             return self.share_hashes
159         d.addCallback(_try)
160         return d
161
162     def get_uri_extension(self):
163         d = self._start()
164         def _try(unused=None):
165             if self.mode == "bad uri_extension":
166                 return flip_bit(self.uri_extension)
167             return self.uri_extension
168         d.addCallback(_try)
169         return d
170
171
172 def make_data(length):
173     data = "happy happy joy joy" * 100
174     assert length <= len(data)
175     return data[:length]
176
177 class ValidatedExtendedURIProxy(unittest.TestCase):
178     timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
179     K = 4
180     M = 10
181     SIZE = 200
182     SEGSIZE = 72
183     _TMP = SIZE%SEGSIZE
184     if _TMP == 0:
185         _TMP = SEGSIZE
186     if _TMP % K != 0:
187         _TMP += (K - (_TMP % K))
188     TAIL_SEGSIZE = _TMP
189     _TMP = SIZE / SEGSIZE
190     if SIZE % SEGSIZE != 0:
191         _TMP += 1
192     NUM_SEGMENTS = _TMP
193     mindict = { 'segment_size': SEGSIZE,
194                 'crypttext_root_hash': '0'*hashutil.CRYPTO_VAL_SIZE,
195                 'share_root_hash': '1'*hashutil.CRYPTO_VAL_SIZE }
196     optional_consistent = { 'crypttext_hash': '2'*hashutil.CRYPTO_VAL_SIZE,
197                             'codec_name': "crs",
198                             'codec_params': "%d-%d-%d" % (SEGSIZE, K, M),
199                             'tail_codec_params': "%d-%d-%d" % (TAIL_SEGSIZE, K, M),
200                             'num_segments': NUM_SEGMENTS,
201                             'size': SIZE,
202                             'needed_shares': K,
203                             'total_shares': M,
204                             'plaintext_hash': "anything",
205                             'plaintext_root_hash': "anything", }
206     # optional_inconsistent = { 'crypttext_hash': ('2'*(hashutil.CRYPTO_VAL_SIZE-1), "", 77),
207     optional_inconsistent = { 'crypttext_hash': (77,),
208                               'codec_name': ("digital fountain", ""),
209                               'codec_params': ("%d-%d-%d" % (SEGSIZE, K-1, M),
210                                                "%d-%d-%d" % (SEGSIZE-1, K, M),
211                                                "%d-%d-%d" % (SEGSIZE, K, M-1)),
212                               'tail_codec_params': ("%d-%d-%d" % (TAIL_SEGSIZE, K-1, M),
213                                                "%d-%d-%d" % (TAIL_SEGSIZE-1, K, M),
214                                                "%d-%d-%d" % (TAIL_SEGSIZE, K, M-1)),
215                               'num_segments': (NUM_SEGMENTS-1,),
216                               'size': (SIZE-1,),
217                               'needed_shares': (K-1,),
218                               'total_shares': (M-1,), }
219
220     def _test(self, uebdict):
221         uebstring = uri.pack_extension(uebdict)
222         uebhash = hashutil.uri_extension_hash(uebstring)
223         fb = FakeBucketReaderWriterProxy()
224         fb.put_uri_extension(uebstring)
225         verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE)
226         vup = download.ValidatedExtendedURIProxy(fb, verifycap)
227         return vup.start()
228
229     def _test_accept(self, uebdict):
230         return self._test(uebdict)
231
232     def _should_fail(self, res, expected_failures):
233         if isinstance(res, Failure):
234             res.trap(*expected_failures)
235         else:
236             self.fail("was supposed to raise %s, not get '%s'" % (expected_failures, res))
237
238     def _test_reject(self, uebdict):
239         d = self._test(uebdict)
240         d.addBoth(self._should_fail, (KeyError, download.BadURIExtension))
241         return d
242
243     def test_accept_minimal(self):
244         return self._test_accept(self.mindict)
245
246     def test_reject_insufficient(self):
247         dl = []
248         for k in self.mindict.iterkeys():
249             insuffdict = self.mindict.copy()
250             del insuffdict[k]
251             d = self._test_reject(insuffdict)
252         dl.append(d)
253         return defer.DeferredList(dl)
254
255     def test_accept_optional(self):
256         dl = []
257         for k in self.optional_consistent.iterkeys():
258             mydict = self.mindict.copy()
259             mydict[k] = self.optional_consistent[k]
260             d = self._test_accept(mydict)
261         dl.append(d)
262         return defer.DeferredList(dl)
263
264     def test_reject_optional(self):
265         dl = []
266         for k in self.optional_inconsistent.iterkeys():
267             for v in self.optional_inconsistent[k]:
268                 mydict = self.mindict.copy()
269                 mydict[k] = v
270                 d = self._test_reject(mydict)
271                 dl.append(d)
272         return defer.DeferredList(dl)
273
274 class Encode(unittest.TestCase):
275     timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
276
277     def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
278                   expected_block_hashes, expected_share_hashes):
279         data = make_data(datalen)
280         # force use of multiple segments
281         e = encode.Encoder()
282         u = upload.Data(data, convergence="some convergence string")
283         u.max_segment_size = max_segment_size
284         u.encoding_param_k = 25
285         u.encoding_param_happy = 75
286         u.encoding_param_n = 100
287         eu = upload.EncryptAnUploadable(u)
288         d = e.set_encrypted_uploadable(eu)
289
290         all_shareholders = []
291         def _ready(res):
292             k,happy,n = e.get_param("share_counts")
293             _assert(n == NUM_SHARES) # else we'll be completely confused
294             numsegs = e.get_param("num_segments")
295             _assert(numsegs == NUM_SEGMENTS, numsegs, NUM_SEGMENTS)
296             segsize = e.get_param("segment_size")
297             _assert( (NUM_SEGMENTS-1)*segsize < len(data) <= NUM_SEGMENTS*segsize,
298                      NUM_SEGMENTS, segsize,
299                      (NUM_SEGMENTS-1)*segsize, len(data), NUM_SEGMENTS*segsize)
300
301             shareholders = {}
302             servermap = {}
303             for shnum in range(NUM_SHARES):
304                 peer = FakeBucketReaderWriterProxy()
305                 shareholders[shnum] = peer
306                 servermap.setdefault(shnum, set()).add(peer.get_peerid())
307                 all_shareholders.append(peer)
308             e.set_shareholders(shareholders, servermap)
309             return e.start()
310         d.addCallback(_ready)
311
312         def _check(res):
313             verifycap = res
314             self.failUnless(isinstance(verifycap.uri_extension_hash, str))
315             self.failUnlessEqual(len(verifycap.uri_extension_hash), 32)
316             for i,peer in enumerate(all_shareholders):
317                 self.failUnless(peer.closed)
318                 self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
319                 # each peer gets a full tree of block hashes. For 3 or 4
320                 # segments, that's 7 hashes. For 5 segments it's 15 hashes.
321                 self.failUnlessEqual(len(peer.block_hashes),
322                                      expected_block_hashes)
323                 for h in peer.block_hashes:
324                     self.failUnlessEqual(len(h), 32)
325                 # each peer also gets their necessary chain of share hashes.
326                 # For 100 shares (rounded up to 128 leaves), that's 8 hashes
327                 self.failUnlessEqual(len(peer.share_hashes),
328                                      expected_share_hashes)
329                 for (hashnum, h) in peer.share_hashes:
330                     self.failUnless(isinstance(hashnum, int))
331                     self.failUnlessEqual(len(h), 32)
332         d.addCallback(_check)
333
334         return d
335
336     # a series of 3*3 tests to check out edge conditions. One axis is how the
337     # plaintext is divided into segments: kn+(-1,0,1). Another way to express
338     # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we
339     # might test 74 bytes, 75 bytes, and 76 bytes.
340
341     # on the other axis is how many leaves in the block hash tree we wind up
342     # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
343     # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
344     # segments, and 5 segments.
345
346     # that results in the following series of data lengths:
347     #  3 segs: 74, 75, 51
348     #  4 segs: 99, 100, 76
349     #  5 segs: 124, 125, 101
350
351     # all tests encode to 100 shares, which means the share hash tree will
352     # have 128 leaves, which means that buckets will be given an 8-long share
353     # hash chain
354
355     # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
356     # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
357     # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
358     # trees, which get 15 blockhashes.
359
360     def test_send_74(self):
361         # 3 segments (25, 25, 24)
362         return self.do_encode(25, 74, 100, 3, 7, 8)
363     def test_send_75(self):
364         # 3 segments (25, 25, 25)
365         return self.do_encode(25, 75, 100, 3, 7, 8)
366     def test_send_51(self):
367         # 3 segments (25, 25, 1)
368         return self.do_encode(25, 51, 100, 3, 7, 8)
369
370     def test_send_76(self):
371         # encode a 76 byte file (in 4 segments: 25,25,25,1) to 100 shares
372         return self.do_encode(25, 76, 100, 4, 7, 8)
373     def test_send_99(self):
374         # 4 segments: 25,25,25,24
375         return self.do_encode(25, 99, 100, 4, 7, 8)
376     def test_send_100(self):
377         # 4 segments: 25,25,25,25
378         return self.do_encode(25, 100, 100, 4, 7, 8)
379
380     def test_send_124(self):
381         # 5 segments: 25, 25, 25, 25, 24
382         return self.do_encode(25, 124, 100, 5, 15, 8)
383     def test_send_125(self):
384         # 5 segments: 25, 25, 25, 25, 25
385         return self.do_encode(25, 125, 100, 5, 15, 8)
386     def test_send_101(self):
387         # 5 segments: 25, 25, 25, 25, 1
388         return self.do_encode(25, 101, 100, 5, 15, 8)
389
390 class PausingConsumer(MemoryConsumer):
391     def __init__(self):
392         MemoryConsumer.__init__(self)
393         self.size = 0
394         self.writes = 0
395     def write(self, data):
396         self.size += len(data)
397         self.writes += 1
398         if self.writes <= 2:
399             # we happen to use 4 segments, and want to avoid pausing on the
400             # last one (since then the _unpause timer will still be running)
401             self.producer.pauseProducing()
402             reactor.callLater(0.1, self._unpause)
403         return MemoryConsumer.write(self, data)
404     def _unpause(self):
405         self.producer.resumeProducing()
406
407 class PausingAndStoppingConsumer(PausingConsumer):
408     def write(self, data):
409         self.producer.pauseProducing()
410         reactor.callLater(0.5, self._stop)
411     def _stop(self):
412         self.producer.stopProducing()
413
414 class StoppingConsumer(PausingConsumer):
415     def write(self, data):
416         self.producer.stopProducing()
417
418 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
419     timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
420     def send_and_recover(self, k_and_happy_and_n=(25,75,100),
421                          AVAILABLE_SHARES=None,
422                          datalen=76,
423                          max_segment_size=25,
424                          bucket_modes={},
425                          recover_mode="recover",
426                          consumer=None,
427                          ):
428         if AVAILABLE_SHARES is None:
429             AVAILABLE_SHARES = k_and_happy_and_n[2]
430         data = make_data(datalen)
431         d = self.send(k_and_happy_and_n, AVAILABLE_SHARES,
432                       max_segment_size, bucket_modes, data)
433         # that fires with (uri_extension_hash, e, shareholders)
434         d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode,
435                       consumer=consumer)
436         # that fires with newdata
437         def _downloaded((newdata, fd)):
438             self.failUnless(newdata == data, str((len(newdata), len(data))))
439             return fd
440         d.addCallback(_downloaded)
441         return d
442
443     def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,
444              bucket_modes, data):
445         k, happy, n = k_and_happy_and_n
446         NUM_SHARES = k_and_happy_and_n[2]
447         if AVAILABLE_SHARES is None:
448             AVAILABLE_SHARES = NUM_SHARES
449         e = encode.Encoder()
450         u = upload.Data(data, convergence="some convergence string")
451         # force use of multiple segments by using a low max_segment_size
452         u.max_segment_size = max_segment_size
453         u.encoding_param_k = k
454         u.encoding_param_happy = happy
455         u.encoding_param_n = n
456         eu = upload.EncryptAnUploadable(u)
457         d = e.set_encrypted_uploadable(eu)
458
459         shareholders = {}
460         def _ready(res):
461             k,happy,n = e.get_param("share_counts")
462             assert n == NUM_SHARES # else we'll be completely confused
463             servermap = {}
464             for shnum in range(NUM_SHARES):
465                 mode = bucket_modes.get(shnum, "good")
466                 peer = FakeBucketReaderWriterProxy(mode, "peer%d" % shnum)
467                 shareholders[shnum] = peer
468                 servermap.setdefault(shnum, set()).add(peer.get_peerid())
469             e.set_shareholders(shareholders, servermap)
470             return e.start()
471         d.addCallback(_ready)
472         def _sent(res):
473             d1 = u.get_encryption_key()
474             d1.addCallback(lambda key: (res, key, shareholders))
475             return d1
476         d.addCallback(_sent)
477         return d
478
479     def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
480                 recover_mode, consumer=None):
481         verifycap = res
482
483         if "corrupt_key" in recover_mode:
484             # we corrupt the key, so that the decrypted data is corrupted and
485             # will fail the plaintext hash check. Since we're manually
486             # attaching shareholders, the fact that the storage index is also
487             # corrupted doesn't matter.
488             key = flip_bit(key)
489
490         u = uri.CHKFileURI(key=key,
491                            uri_extension_hash=verifycap.uri_extension_hash,
492                            needed_shares=verifycap.needed_shares,
493                            total_shares=verifycap.total_shares,
494                            size=verifycap.size)
495
496         sb = FakeStorageBroker()
497         if not consumer:
498             consumer = MemoryConsumer()
499         innertarget = download.ConsumerAdapter(consumer)
500         target = download.DecryptingTarget(innertarget, u.key)
501         fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor())
502
503         # we manually cycle the CiphertextDownloader through a number of steps that
504         # would normally be sequenced by a Deferred chain in
505         # CiphertextDownloader.start(), to give us more control over the process.
506         # In particular, by bypassing _get_all_shareholders, we skip
507         # permuted-peerlist selection.
508         for shnum, bucket in shareholders.items():
509             if shnum < AVAILABLE_SHARES and bucket.closed:
510                 fd.add_share_bucket(shnum, bucket)
511         fd._got_all_shareholders(None)
512
513         # Make it possible to obtain uri_extension from the shareholders.
514         # Arrange for shareholders[0] to be the first, so we can selectively
515         # corrupt the data it returns.
516         uri_extension_sources = shareholders.values()
517         uri_extension_sources.remove(shareholders[0])
518         uri_extension_sources.insert(0, shareholders[0])
519
520         d = defer.succeed(None)
521
522         # have the CiphertextDownloader retrieve a copy of uri_extension itself
523         d.addCallback(fd._obtain_uri_extension)
524
525         if "corrupt_crypttext_hashes" in recover_mode:
526             # replace everybody's crypttext hash trees with a different one
527             # (computed over a different file), then modify our uri_extension
528             # to reflect the new crypttext hash tree root
529             def _corrupt_crypttext_hashes(unused):
530                 assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup
531                 assert fd._vup.crypttext_root_hash, fd._vup
532                 badhash = hashutil.tagged_hash("bogus", "data")
533                 bad_crypttext_hashes = [badhash] * fd._vup.num_segments
534                 badtree = hashtree.HashTree(bad_crypttext_hashes)
535                 for bucket in shareholders.values():
536                     bucket.crypttext_hashes = list(badtree)
537                 fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments)
538                 fd._crypttext_hash_tree.set_hashes({0: badtree[0]})
539                 return fd._vup
540             d.addCallback(_corrupt_crypttext_hashes)
541
542         # also have the CiphertextDownloader ask for hash trees
543         d.addCallback(fd._get_crypttext_hash_tree)
544
545         d.addCallback(fd._download_all_segments)
546         d.addCallback(fd._done)
547         def _done(t):
548             newdata = "".join(consumer.chunks)
549             return (newdata, fd)
550         d.addCallback(_done)
551         return d
552
553     def test_not_enough_shares(self):
554         d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
555         def _done(res):
556             self.failUnless(isinstance(res, Failure))
557             self.failUnless(res.check(NotEnoughSharesError))
558         d.addBoth(_done)
559         return d
560
561     def test_one_share_per_peer(self):
562         return self.send_and_recover()
563
564     def test_74(self):
565         return self.send_and_recover(datalen=74)
566     def test_75(self):
567         return self.send_and_recover(datalen=75)
568     def test_51(self):
569         return self.send_and_recover(datalen=51)
570
571     def test_99(self):
572         return self.send_and_recover(datalen=99)
573     def test_100(self):
574         return self.send_and_recover(datalen=100)
575     def test_76(self):
576         return self.send_and_recover(datalen=76)
577
578     def test_124(self):
579         return self.send_and_recover(datalen=124)
580     def test_125(self):
581         return self.send_and_recover(datalen=125)
582     def test_101(self):
583         return self.send_and_recover(datalen=101)
584
585     def test_pause(self):
586         # use a download target that does pauseProducing/resumeProducing a
587         # few times, then finishes
588         c = PausingConsumer()
589         d = self.send_and_recover(consumer=c)
590         return d
591
592     def test_pause_then_stop(self):
593         # use a download target that pauses, then stops.
594         c = PausingAndStoppingConsumer()
595         d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop",
596                             "our Consumer called stopProducing()",
597                             self.send_and_recover, consumer=c)
598         return d
599
600     def test_stop(self):
601         # use a download targetthat does an immediate stop (ticket #473)
602         c = StoppingConsumer()
603         d = self.shouldFail(download.DownloadStopped, "test_stop",
604                             "our Consumer called stopProducing()",
605                             self.send_and_recover, consumer=c)
606         return d
607
608     # the following tests all use 4-out-of-10 encoding
609
610     def test_bad_blocks(self):
611         # the first 6 servers have bad blocks, which will be caught by the
612         # blockhashes
613         modemap = dict([(i, "bad block")
614                         for i in range(6)]
615                        + [(i, "good")
616                           for i in range(6, 10)])
617         return self.send_and_recover((4,8,10), bucket_modes=modemap)
618
619     def test_bad_blocks_failure(self):
620         # the first 7 servers have bad blocks, which will be caught by the
621         # blockhashes, and the download will fail
622         modemap = dict([(i, "bad block")
623                         for i in range(7)]
624                        + [(i, "good")
625                           for i in range(7, 10)])
626         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
627         def _done(res):
628             self.failUnless(isinstance(res, Failure), res)
629             self.failUnless(res.check(NotEnoughSharesError), res)
630         d.addBoth(_done)
631         return d
632
633     def test_bad_blockhashes(self):
634         # the first 6 servers have bad block hashes, so the blockhash tree
635         # will not validate
636         modemap = dict([(i, "bad blockhash")
637                         for i in range(6)]
638                        + [(i, "good")
639                           for i in range(6, 10)])
640         return self.send_and_recover((4,8,10), bucket_modes=modemap)
641
642     def test_bad_blockhashes_failure(self):
643         # the first 7 servers have bad block hashes, so the blockhash tree
644         # will not validate, and the download will fail
645         modemap = dict([(i, "bad blockhash")
646                         for i in range(7)]
647                        + [(i, "good")
648                           for i in range(7, 10)])
649         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
650         def _done(res):
651             self.failUnless(isinstance(res, Failure))
652             self.failUnless(res.check(NotEnoughSharesError), res)
653         d.addBoth(_done)
654         return d
655
656     def test_bad_sharehashes(self):
657         # the first 6 servers have bad block hashes, so the sharehash tree
658         # will not validate
659         modemap = dict([(i, "bad sharehash")
660                         for i in range(6)]
661                        + [(i, "good")
662                           for i in range(6, 10)])
663         return self.send_and_recover((4,8,10), bucket_modes=modemap)
664
665     def assertFetchFailureIn(self, fd, where):
666         expected = {"uri_extension": 0,
667                     "crypttext_hash_tree": 0,
668                     }
669         if where is not None:
670             expected[where] += 1
671         self.failUnlessEqual(fd._fetch_failures, expected)
672
673     def test_good(self):
674         # just to make sure the test harness works when we aren't
675         # intentionally causing failures
676         modemap = dict([(i, "good") for i in range(0, 10)])
677         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
678         d.addCallback(self.assertFetchFailureIn, None)
679         return d
680
681     def test_bad_uri_extension(self):
682         # the first server has a bad uri_extension block, so we will fail
683         # over to a different server.
684         modemap = dict([(i, "bad uri_extension") for i in range(1)] +
685                        [(i, "good") for i in range(1, 10)])
686         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
687         d.addCallback(self.assertFetchFailureIn, "uri_extension")
688         return d
689
690     def test_bad_crypttext_hashroot(self):
691         # the first server has a bad crypttext hashroot, so we will fail
692         # over to a different server.
693         modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
694                        [(i, "good") for i in range(1, 10)])
695         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
696         d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
697         return d
698
699     def test_bad_crypttext_hashes(self):
700         # the first server has a bad crypttext hash block, so we will fail
701         # over to a different server.
702         modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
703                        [(i, "good") for i in range(1, 10)])
704         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
705         d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
706         return d
707
708     def test_bad_crypttext_hashes_failure(self):
709         # to test that the crypttext merkle tree is really being applied, we
710         # sneak into the download process and corrupt two things: we replace
711         # everybody's crypttext hashtree with a bad version (computed over
712         # bogus data), and we modify the supposedly-validated uri_extension
713         # block to match the new crypttext hashtree root. The download
714         # process should notice that the crypttext coming out of FEC doesn't
715         # match the tree, and fail.
716
717         modemap = dict([(i, "good") for i in range(0, 10)])
718         d = self.send_and_recover((4,8,10), bucket_modes=modemap,
719                                   recover_mode=("corrupt_crypttext_hashes"))
720         def _done(res):
721             self.failUnless(isinstance(res, Failure))
722             self.failUnless(res.check(hashtree.BadHashError), res)
723         d.addBoth(_done)
724         return d
725
726     def OFF_test_bad_plaintext(self):
727         # faking a decryption failure is easier: just corrupt the key
728         modemap = dict([(i, "good") for i in range(0, 10)])
729         d = self.send_and_recover((4,8,10), bucket_modes=modemap,
730                                   recover_mode=("corrupt_key"))
731         def _done(res):
732             self.failUnless(isinstance(res, Failure))
733             self.failUnless(res.check(hashtree.BadHashError), res)
734         d.addBoth(_done)
735         return d
736
737     def test_bad_sharehashes_failure(self):
738         # all ten servers have bad share hashes, so the sharehash tree
739         # will not validate, and the download will fail
740         modemap = dict([(i, "bad sharehash")
741                         for i in range(10)])
742         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
743         def _done(res):
744             self.failUnless(isinstance(res, Failure))
745             self.failUnless(res.check(NotEnoughSharesError))
746         d.addBoth(_done)
747         return d
748
749     def test_missing_sharehashes(self):
750         # the first 6 servers are missing their sharehashes, so the
751         # sharehash tree will not validate
752         modemap = dict([(i, "missing sharehash")
753                         for i in range(6)]
754                        + [(i, "good")
755                           for i in range(6, 10)])
756         return self.send_and_recover((4,8,10), bucket_modes=modemap)
757
758     def test_missing_sharehashes_failure(self):
759         # all servers are missing their sharehashes, so the sharehash tree will not validate,
760         # and the download will fail
761         modemap = dict([(i, "missing sharehash")
762                         for i in range(10)])
763         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
764         def _done(res):
765             self.failUnless(isinstance(res, Failure), res)
766             self.failUnless(res.check(NotEnoughSharesError), res)
767         d.addBoth(_done)
768         return d
769
770     def test_lost_one_shareholder(self):
771         # we have enough shareholders when we start, but one segment in we
772         # lose one of them. The upload should still succeed, as long as we
773         # still have 'servers_of_happiness' peers left.
774         modemap = dict([(i, "good") for i in range(9)] +
775                        [(i, "lost") for i in range(9, 10)])
776         return self.send_and_recover((4,8,10), bucket_modes=modemap)
777
778     def test_lost_one_shareholder_early(self):
779         # we have enough shareholders when we choose peers, but just before
780         # we send the 'start' message, we lose one of them. The upload should
781         # still succeed, as long as we still have 'servers_of_happiness' peers
782         # left.
783         modemap = dict([(i, "good") for i in range(9)] +
784                        [(i, "lost-early") for i in range(9, 10)])
785         return self.send_and_recover((4,8,10), bucket_modes=modemap)
786
787     def test_lost_many_shareholders(self):
788         # we have enough shareholders when we start, but one segment in we
789         # lose all but one of them. The upload should fail.
790         modemap = dict([(i, "good") for i in range(1)] +
791                        [(i, "lost") for i in range(1, 10)])
792         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
793         def _done(res):
794             self.failUnless(isinstance(res, Failure))
795             self.failUnless(res.check(UploadUnhappinessError), res)
796         d.addBoth(_done)
797         return d
798
799     def test_lost_all_shareholders(self):
800         # we have enough shareholders when we start, but one segment in we
801         # lose all of them. The upload should fail.
802         modemap = dict([(i, "lost") for i in range(10)])
803         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
804         def _done(res):
805             self.failUnless(isinstance(res, Failure))
806             self.failUnless(res.check(UploadUnhappinessError))
807         d.addBoth(_done)
808         return d