]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_encode.py
39e0adae2223e00b034466306e6ee31580107174
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_encode.py
1 from zope.interface import implements
2 from twisted.trial import unittest
3 from twisted.internet import defer, reactor
4 from twisted.internet.interfaces import IConsumer
5 from twisted.python.failure import Failure
6 from foolscap import eventual
7 from allmydata import hashtree, uri
8 from allmydata.immutable import encode, upload, download
9 from allmydata.util import hashutil
10 from allmydata.util.assertutil import _assert
11 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
12 import common_util as testutil
13
14 class LostPeerError(Exception):
15     pass
16
17 def flip_bit(good): # flips the last bit
18     return good[:-1] + chr(ord(good[-1]) ^ 0x01)
19
20 class FakeClient:
21     def log(self, *args, **kwargs):
22         pass
23
24 class FakeBucketReaderWriterProxy:
25     implements(IStorageBucketWriter, IStorageBucketReader)
26     # these are used for both reading and writing
27     def __init__(self, mode="good"):
28         self.mode = mode
29         self.blocks = {}
30         self.plaintext_hashes = []
31         self.crypttext_hashes = []
32         self.block_hashes = None
33         self.share_hashes = None
34         self.closed = False
35
36     def get_peerid(self):
37         return "peerid"
38
39     def _start(self):
40         if self.mode == "lost-early":
41             f = Failure(LostPeerError("I went away early"))
42             return eventual.fireEventually(f)
43         return defer.succeed(self)
44
45     def put_header(self):
46         return self._start()
47
48     def put_block(self, segmentnum, data):
49         if self.mode == "lost-early":
50             f = Failure(LostPeerError("I went away early"))
51             return eventual.fireEventually(f)
52         def _try():
53             assert not self.closed
54             assert segmentnum not in self.blocks
55             if self.mode == "lost" and segmentnum >= 1:
56                 raise LostPeerError("I'm going away now")
57             self.blocks[segmentnum] = data
58         return defer.maybeDeferred(_try)
59
60     def put_plaintext_hashes(self, hashes):
61         def _try():
62             assert not self.closed
63             assert not self.plaintext_hashes
64             self.plaintext_hashes = hashes
65         return defer.maybeDeferred(_try)
66
67     def put_crypttext_hashes(self, hashes):
68         def _try():
69             assert not self.closed
70             assert not self.crypttext_hashes
71             self.crypttext_hashes = hashes
72         return defer.maybeDeferred(_try)
73
74     def put_block_hashes(self, blockhashes):
75         def _try():
76             assert not self.closed
77             assert self.block_hashes is None
78             self.block_hashes = blockhashes
79         return defer.maybeDeferred(_try)
80
81     def put_share_hashes(self, sharehashes):
82         def _try():
83             assert not self.closed
84             assert self.share_hashes is None
85             self.share_hashes = sharehashes
86         return defer.maybeDeferred(_try)
87
88     def put_uri_extension(self, uri_extension):
89         def _try():
90             assert not self.closed
91             self.uri_extension = uri_extension
92         return defer.maybeDeferred(_try)
93
94     def close(self):
95         def _try():
96             assert not self.closed
97             self.closed = True
98         return defer.maybeDeferred(_try)
99
100     def abort(self):
101         return defer.succeed(None)
102
103     def get_block_data(self, blocknum, blocksize, size):
104         d = self._start()
105         def _try(unused=None):
106             assert isinstance(blocknum, (int, long))
107             if self.mode == "bad block":
108                 return flip_bit(self.blocks[blocknum])
109             return self.blocks[blocknum]
110         d.addCallback(_try)
111         return d
112
113     def get_plaintext_hashes(self):
114         d = self._start()
115         def _try(unused=None):
116             hashes = self.plaintext_hashes[:]
117             return hashes
118         d.addCallback(_try)
119         return d
120
121     def get_crypttext_hashes(self):
122         d = self._start()
123         def _try(unused=None):
124             hashes = self.crypttext_hashes[:]
125             if self.mode == "bad crypttext hashroot":
126                 hashes[0] = flip_bit(hashes[0])
127             if self.mode == "bad crypttext hash":
128                 hashes[1] = flip_bit(hashes[1])
129             return hashes
130         d.addCallback(_try)
131         return d
132
133     def get_block_hashes(self, at_least_these=()):
134         d = self._start()
135         def _try(unused=None):
136             if self.mode == "bad blockhash":
137                 hashes = self.block_hashes[:]
138                 hashes[1] = flip_bit(hashes[1])
139                 return hashes
140             return self.block_hashes
141         d.addCallback(_try)
142         return d
143
144     def get_share_hashes(self, at_least_these=()):
145         d = self._start()
146         def _try(unused=None):
147             if self.mode == "bad sharehash":
148                 hashes = self.share_hashes[:]
149                 hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
150                 return hashes
151             if self.mode == "missing sharehash":
152                 # one sneaky attack would be to pretend we don't know our own
153                 # sharehash, which could manage to frame someone else.
154                 # download.py is supposed to guard against this case.
155                 return []
156             return self.share_hashes
157         d.addCallback(_try)
158         return d
159
160     def get_uri_extension(self):
161         d = self._start()
162         def _try(unused=None):
163             if self.mode == "bad uri_extension":
164                 return flip_bit(self.uri_extension)
165             return self.uri_extension
166         d.addCallback(_try)
167         return d
168
169
170 def make_data(length):
171     data = "happy happy joy joy" * 100
172     assert length <= len(data)
173     return data[:length]
174
175 class ValidatedExtendedURIProxy(unittest.TestCase):
176     K = 4
177     M = 10
178     SIZE = 200
179     SEGSIZE = 72
180     _TMP = SIZE%SEGSIZE
181     if _TMP == 0:
182         _TMP = SEGSIZE
183     if _TMP % K != 0:
184         _TMP += (K - (_TMP % K))
185     TAIL_SEGSIZE = _TMP
186     _TMP = SIZE / SEGSIZE
187     if SIZE % SEGSIZE != 0:
188         _TMP += 1
189     NUM_SEGMENTS = _TMP
190     mindict = { 'segment_size': SEGSIZE,
191                 'crypttext_root_hash': '0'*hashutil.CRYPTO_VAL_SIZE,
192                 'share_root_hash': '1'*hashutil.CRYPTO_VAL_SIZE }
193     optional_consistent = { 'crypttext_hash': '2'*hashutil.CRYPTO_VAL_SIZE,
194                             'codec_name': "crs",
195                             'codec_params': "%d-%d-%d" % (SEGSIZE, K, M),
196                             'tail_codec_params': "%d-%d-%d" % (TAIL_SEGSIZE, K, M),
197                             'num_segments': NUM_SEGMENTS,
198                             'size': SIZE,
199                             'needed_shares': K,
200                             'total_shares': M,
201                             'plaintext_hash': "anything",
202                             'plaintext_root_hash': "anything", }
203     # optional_inconsistent = { 'crypttext_hash': ('2'*(hashutil.CRYPTO_VAL_SIZE-1), "", 77),
204     optional_inconsistent = { 'crypttext_hash': (77,),
205                               'codec_name': ("digital fountain", ""),
206                               'codec_params': ("%d-%d-%d" % (SEGSIZE, K-1, M),
207                                                "%d-%d-%d" % (SEGSIZE-1, K, M),
208                                                "%d-%d-%d" % (SEGSIZE, K, M-1)),
209                               'tail_codec_params': ("%d-%d-%d" % (TAIL_SEGSIZE, K-1, M),
210                                                "%d-%d-%d" % (TAIL_SEGSIZE-1, K, M),
211                                                "%d-%d-%d" % (TAIL_SEGSIZE, K, M-1)),
212                               'num_segments': (NUM_SEGMENTS-1,),
213                               'size': (SIZE-1,),
214                               'needed_shares': (K-1,),
215                               'total_shares': (M-1,), }
216
217     def _test(self, uebdict):
218         uebstring = uri.pack_extension(uebdict)
219         uebhash = hashutil.uri_extension_hash(uebstring)
220         fb = FakeBucketReaderWriterProxy()
221         fb.put_uri_extension(uebstring)
222         verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE)
223         vup = download.ValidatedExtendedURIProxy(fb, verifycap)
224         return vup.start()
225
226     def _test_accept(self, uebdict):
227         return self._test(uebdict)
228
229     def _should_fail(self, res, expected_failures):
230         if isinstance(res, Failure):
231             res.trap(*expected_failures)
232         else:
233             self.fail("was supposed to raise %s, not get '%s'" % (expected_failures, res))
234
235     def _test_reject(self, uebdict):
236         d = self._test(uebdict)
237         d.addBoth(self._should_fail, (KeyError, download.BadURIExtension))
238         return d
239
240     def test_accept_minimal(self):
241         return self._test_accept(self.mindict)
242
243     def test_reject_insufficient(self):
244         dl = []
245         for k in self.mindict.iterkeys():
246             insuffdict = self.mindict.copy()
247             del insuffdict[k]
248             d = self._test_reject(insuffdict)
249         dl.append(d)
250         return defer.DeferredList(dl)
251
252     def test_accept_optional(self):
253         dl = []
254         for k in self.optional_consistent.iterkeys():
255             mydict = self.mindict.copy()
256             mydict[k] = self.optional_consistent[k]
257             d = self._test_accept(mydict)
258         dl.append(d)
259         return defer.DeferredList(dl)
260
261     def test_reject_optional(self):
262         dl = []
263         for k in self.optional_inconsistent.iterkeys():
264             for v in self.optional_inconsistent[k]:
265                 mydict = self.mindict.copy()
266                 mydict[k] = v
267                 d = self._test_reject(mydict)
268                 dl.append(d)
269         return defer.DeferredList(dl)
270
271 class Encode(unittest.TestCase):
272
273     def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
274                   expected_block_hashes, expected_share_hashes):
275         data = make_data(datalen)
276         # force use of multiple segments
277         e = encode.Encoder()
278         u = upload.Data(data, convergence="some convergence string")
279         u.max_segment_size = max_segment_size
280         u.encoding_param_k = 25
281         u.encoding_param_happy = 75
282         u.encoding_param_n = 100
283         eu = upload.EncryptAnUploadable(u)
284         d = e.set_encrypted_uploadable(eu)
285
286         all_shareholders = []
287         def _ready(res):
288             k,happy,n = e.get_param("share_counts")
289             _assert(n == NUM_SHARES) # else we'll be completely confused
290             numsegs = e.get_param("num_segments")
291             _assert(numsegs == NUM_SEGMENTS, numsegs, NUM_SEGMENTS)
292             segsize = e.get_param("segment_size")
293             _assert( (NUM_SEGMENTS-1)*segsize < len(data) <= NUM_SEGMENTS*segsize,
294                      NUM_SEGMENTS, segsize,
295                      (NUM_SEGMENTS-1)*segsize, len(data), NUM_SEGMENTS*segsize)
296
297             shareholders = {}
298             for shnum in range(NUM_SHARES):
299                 peer = FakeBucketReaderWriterProxy()
300                 shareholders[shnum] = peer
301                 all_shareholders.append(peer)
302             e.set_shareholders(shareholders)
303             return e.start()
304         d.addCallback(_ready)
305
306         def _check(res):
307             (uri_extension_hash, required_shares, num_shares, file_size) = res
308             self.failUnless(isinstance(uri_extension_hash, str))
309             self.failUnlessEqual(len(uri_extension_hash), 32)
310             for i,peer in enumerate(all_shareholders):
311                 self.failUnless(peer.closed)
312                 self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
313                 # each peer gets a full tree of block hashes. For 3 or 4
314                 # segments, that's 7 hashes. For 5 segments it's 15 hashes.
315                 self.failUnlessEqual(len(peer.block_hashes),
316                                      expected_block_hashes)
317                 for h in peer.block_hashes:
318                     self.failUnlessEqual(len(h), 32)
319                 # each peer also gets their necessary chain of share hashes.
320                 # For 100 shares (rounded up to 128 leaves), that's 8 hashes
321                 self.failUnlessEqual(len(peer.share_hashes),
322                                      expected_share_hashes)
323                 for (hashnum, h) in peer.share_hashes:
324                     self.failUnless(isinstance(hashnum, int))
325                     self.failUnlessEqual(len(h), 32)
326         d.addCallback(_check)
327
328         return d
329
330     # a series of 3*3 tests to check out edge conditions. One axis is how the
331     # plaintext is divided into segments: kn+(-1,0,1). Another way to express
332     # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we
333     # might test 74 bytes, 75 bytes, and 76 bytes.
334
335     # on the other axis is how many leaves in the block hash tree we wind up
336     # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
337     # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
338     # segments, and 5 segments.
339
340     # that results in the following series of data lengths:
341     #  3 segs: 74, 75, 51
342     #  4 segs: 99, 100, 76
343     #  5 segs: 124, 125, 101
344
345     # all tests encode to 100 shares, which means the share hash tree will
346     # have 128 leaves, which means that buckets will be given an 8-long share
347     # hash chain
348
349     # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
350     # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
351     # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
352     # trees, which get 15 blockhashes.
353
354     def test_send_74(self):
355         # 3 segments (25, 25, 24)
356         return self.do_encode(25, 74, 100, 3, 7, 8)
357     def test_send_75(self):
358         # 3 segments (25, 25, 25)
359         return self.do_encode(25, 75, 100, 3, 7, 8)
360     def test_send_51(self):
361         # 3 segments (25, 25, 1)
362         return self.do_encode(25, 51, 100, 3, 7, 8)
363
364     def test_send_76(self):
365         # encode a 76 byte file (in 4 segments: 25,25,25,1) to 100 shares
366         return self.do_encode(25, 76, 100, 4, 7, 8)
367     def test_send_99(self):
368         # 4 segments: 25,25,25,24
369         return self.do_encode(25, 99, 100, 4, 7, 8)
370     def test_send_100(self):
371         # 4 segments: 25,25,25,25
372         return self.do_encode(25, 100, 100, 4, 7, 8)
373
374     def test_send_124(self):
375         # 5 segments: 25, 25, 25, 25, 24
376         return self.do_encode(25, 124, 100, 5, 15, 8)
377     def test_send_125(self):
378         # 5 segments: 25, 25, 25, 25, 25
379         return self.do_encode(25, 125, 100, 5, 15, 8)
380     def test_send_101(self):
381         # 5 segments: 25, 25, 25, 25, 1
382         return self.do_encode(25, 101, 100, 5, 15, 8)
383
384 class PausingTarget(download.Data):
385     implements(IConsumer)
386     def __init__(self):
387         download.Data.__init__(self)
388         self.size = 0
389         self.writes = 0
390     def write(self, data):
391         self.size += len(data)
392         self.writes += 1
393         if self.writes <= 2:
394             # we happen to use 4 segments, and want to avoid pausing on the
395             # last one (since then the _unpause timer will still be running)
396             self.producer.pauseProducing()
397             reactor.callLater(0.1, self._unpause)
398         return download.Data.write(self, data)
399     def _unpause(self):
400         self.producer.resumeProducing()
401     def registerProducer(self, producer, streaming):
402         self.producer = producer
403     def unregisterProducer(self):
404         self.producer = None
405
406 class PausingAndStoppingTarget(PausingTarget):
407     def write(self, data):
408         self.producer.pauseProducing()
409         reactor.callLater(0.5, self._stop)
410     def _stop(self):
411         self.producer.stopProducing()
412
413 class StoppingTarget(PausingTarget):
414     def write(self, data):
415         self.producer.stopProducing()
416
417 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
418     def send_and_recover(self, k_and_happy_and_n=(25,75,100),
419                          AVAILABLE_SHARES=None,
420                          datalen=76,
421                          max_segment_size=25,
422                          bucket_modes={},
423                          recover_mode="recover",
424                          target=None,
425                          ):
426         if AVAILABLE_SHARES is None:
427             AVAILABLE_SHARES = k_and_happy_and_n[2]
428         data = make_data(datalen)
429         d = self.send(k_and_happy_and_n, AVAILABLE_SHARES,
430                       max_segment_size, bucket_modes, data)
431         # that fires with (uri_extension_hash, e, shareholders)
432         d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode,
433                       target=target)
434         # that fires with newdata
435         def _downloaded((newdata, fd)):
436             self.failUnless(newdata == data)
437             return fd
438         d.addCallback(_downloaded)
439         return d
440
441     def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,
442              bucket_modes, data):
443         k, happy, n = k_and_happy_and_n
444         NUM_SHARES = k_and_happy_and_n[2]
445         if AVAILABLE_SHARES is None:
446             AVAILABLE_SHARES = NUM_SHARES
447         e = encode.Encoder()
448         u = upload.Data(data, convergence="some convergence string")
449         # force use of multiple segments by using a low max_segment_size
450         u.max_segment_size = max_segment_size
451         u.encoding_param_k = k
452         u.encoding_param_happy = happy
453         u.encoding_param_n = n
454         eu = upload.EncryptAnUploadable(u)
455         d = e.set_encrypted_uploadable(eu)
456
457         shareholders = {}
458         def _ready(res):
459             k,happy,n = e.get_param("share_counts")
460             assert n == NUM_SHARES # else we'll be completely confused
461             all_peers = []
462             for shnum in range(NUM_SHARES):
463                 mode = bucket_modes.get(shnum, "good")
464                 peer = FakeBucketReaderWriterProxy(mode)
465                 shareholders[shnum] = peer
466             e.set_shareholders(shareholders)
467             return e.start()
468         d.addCallback(_ready)
469         def _sent(res):
470             d1 = u.get_encryption_key()
471             d1.addCallback(lambda key: (res, key, shareholders))
472             return d1
473         d.addCallback(_sent)
474         return d
475
476     def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
477                 recover_mode, target=None):
478         (uri_extension_hash, required_shares, num_shares, file_size) = res
479
480         if "corrupt_key" in recover_mode:
481             # we corrupt the key, so that the decrypted data is corrupted and
482             # will fail the plaintext hash check. Since we're manually
483             # attaching shareholders, the fact that the storage index is also
484             # corrupted doesn't matter.
485             key = flip_bit(key)
486
487         u = uri.CHKFileURI(key=key,
488                            uri_extension_hash=uri_extension_hash,
489                            needed_shares=required_shares,
490                            total_shares=num_shares,
491                            size=file_size)
492
493         client = FakeClient()
494         if not target:
495             target = download.Data()
496         fd = download.FileDownloader(client, u, target)
497
498         # we manually cycle the FileDownloader through a number of steps that
499         # would normally be sequenced by a Deferred chain in
500         # FileDownloader.start(), to give us more control over the process.
501         # In particular, by bypassing _get_all_shareholders, we skip
502         # permuted-peerlist selection.
503         for shnum, bucket in shareholders.items():
504             if shnum < AVAILABLE_SHARES and bucket.closed:
505                 fd.add_share_bucket(shnum, bucket)
506         fd._got_all_shareholders(None)
507
508         # Make it possible to obtain uri_extension from the shareholders.
509         # Arrange for shareholders[0] to be the first, so we can selectively
510         # corrupt the data it returns.
511         uri_extension_sources = shareholders.values()
512         uri_extension_sources.remove(shareholders[0])
513         uri_extension_sources.insert(0, shareholders[0])
514
515         d = defer.succeed(None)
516
517         # have the FileDownloader retrieve a copy of uri_extension itself
518         d.addCallback(fd._obtain_uri_extension)
519
520         if "corrupt_crypttext_hashes" in recover_mode:
521             # replace everybody's crypttext hash trees with a different one
522             # (computed over a different file), then modify our uri_extension
523             # to reflect the new crypttext hash tree root
524             def _corrupt_crypttext_hashes(unused):
525                 assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup
526                 assert fd._vup.crypttext_root_hash, fd._vup
527                 badhash = hashutil.tagged_hash("bogus", "data")
528                 bad_crypttext_hashes = [badhash] * fd._vup.num_segments
529                 badtree = hashtree.HashTree(bad_crypttext_hashes)
530                 for bucket in shareholders.values():
531                     bucket.crypttext_hashes = list(badtree)
532                 fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments)
533                 fd._crypttext_hash_tree.set_hashes({0: badtree[0]})
534                 return fd._vup
535             d.addCallback(_corrupt_crypttext_hashes)
536
537         # also have the FileDownloader ask for hash trees
538         d.addCallback(fd._get_crypttext_hash_tree)
539
540         d.addCallback(fd._download_all_segments)
541         d.addCallback(fd._done)
542         def _done(newdata):
543             return (newdata, fd)
544         d.addCallback(_done)
545         return d
546
547     def test_not_enough_shares(self):
548         d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
549         def _done(res):
550             self.failUnless(isinstance(res, Failure))
551             self.failUnless(res.check(NotEnoughSharesError))
552         d.addBoth(_done)
553         return d
554
555     def test_one_share_per_peer(self):
556         return self.send_and_recover()
557
558     def test_74(self):
559         return self.send_and_recover(datalen=74)
560     def test_75(self):
561         return self.send_and_recover(datalen=75)
562     def test_51(self):
563         return self.send_and_recover(datalen=51)
564
565     def test_99(self):
566         return self.send_and_recover(datalen=99)
567     def test_100(self):
568         return self.send_and_recover(datalen=100)
569     def test_76(self):
570         return self.send_and_recover(datalen=76)
571
572     def test_124(self):
573         return self.send_and_recover(datalen=124)
574     def test_125(self):
575         return self.send_and_recover(datalen=125)
576     def test_101(self):
577         return self.send_and_recover(datalen=101)
578
579     def test_pause(self):
580         # use a DownloadTarget that does pauseProducing/resumeProducing a few
581         # times, then finishes
582         t = PausingTarget()
583         d = self.send_and_recover(target=t)
584         return d
585
586     def test_pause_then_stop(self):
587         # use a DownloadTarget that pauses, then stops.
588         t = PausingAndStoppingTarget()
589         d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop",
590                             "our Consumer called stopProducing()",
591                             self.send_and_recover, target=t)
592         return d
593
594     def test_stop(self):
595         # use a DownloadTarget that does an immediate stop (ticket #473)
596         t = StoppingTarget()
597         d = self.shouldFail(download.DownloadStopped, "test_stop",
598                             "our Consumer called stopProducing()",
599                             self.send_and_recover, target=t)
600         return d
601
602     # the following tests all use 4-out-of-10 encoding
603
604     def test_bad_blocks(self):
605         # the first 6 servers have bad blocks, which will be caught by the
606         # blockhashes
607         modemap = dict([(i, "bad block")
608                         for i in range(6)]
609                        + [(i, "good")
610                           for i in range(6, 10)])
611         return self.send_and_recover((4,8,10), bucket_modes=modemap)
612
613     def test_bad_blocks_failure(self):
614         # the first 7 servers have bad blocks, which will be caught by the
615         # blockhashes, and the download will fail
616         modemap = dict([(i, "bad block")
617                         for i in range(7)]
618                        + [(i, "good")
619                           for i in range(7, 10)])
620         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
621         def _done(res):
622             self.failUnless(isinstance(res, Failure), res)
623             self.failUnless(res.check(NotEnoughSharesError), res)
624         d.addBoth(_done)
625         return d
626
627     def test_bad_blockhashes(self):
628         # the first 6 servers have bad block hashes, so the blockhash tree
629         # will not validate
630         modemap = dict([(i, "bad blockhash")
631                         for i in range(6)]
632                        + [(i, "good")
633                           for i in range(6, 10)])
634         return self.send_and_recover((4,8,10), bucket_modes=modemap)
635
636     def test_bad_blockhashes_failure(self):
637         # the first 7 servers have bad block hashes, so the blockhash tree
638         # will not validate, and the download will fail
639         modemap = dict([(i, "bad blockhash")
640                         for i in range(7)]
641                        + [(i, "good")
642                           for i in range(7, 10)])
643         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
644         def _done(res):
645             self.failUnless(isinstance(res, Failure))
646             self.failUnless(res.check(NotEnoughSharesError), res)
647         d.addBoth(_done)
648         return d
649
650     def test_bad_sharehashes(self):
651         # the first 6 servers have bad block hashes, so the sharehash tree
652         # will not validate
653         modemap = dict([(i, "bad sharehash")
654                         for i in range(6)]
655                        + [(i, "good")
656                           for i in range(6, 10)])
657         return self.send_and_recover((4,8,10), bucket_modes=modemap)
658
659     def assertFetchFailureIn(self, fd, where):
660         expected = {"uri_extension": 0,
661                     "crypttext_hash_tree": 0,
662                     }
663         if where is not None:
664             expected[where] += 1
665         self.failUnlessEqual(fd._fetch_failures, expected)
666
667     def test_good(self):
668         # just to make sure the test harness works when we aren't
669         # intentionally causing failures
670         modemap = dict([(i, "good") for i in range(0, 10)])
671         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
672         d.addCallback(self.assertFetchFailureIn, None)
673         return d
674
675     def test_bad_uri_extension(self):
676         # the first server has a bad uri_extension block, so we will fail
677         # over to a different server.
678         modemap = dict([(i, "bad uri_extension") for i in range(1)] +
679                        [(i, "good") for i in range(1, 10)])
680         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
681         d.addCallback(self.assertFetchFailureIn, "uri_extension")
682         return d
683
684     def test_bad_crypttext_hashroot(self):
685         # the first server has a bad crypttext hashroot, so we will fail
686         # over to a different server.
687         modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
688                        [(i, "good") for i in range(1, 10)])
689         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
690         d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
691         return d
692
693     def test_bad_crypttext_hashes(self):
694         # the first server has a bad crypttext hash block, so we will fail
695         # over to a different server.
696         modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
697                        [(i, "good") for i in range(1, 10)])
698         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
699         d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
700         return d
701
702     def test_bad_crypttext_hashes_failure(self):
703         # to test that the crypttext merkle tree is really being applied, we
704         # sneak into the download process and corrupt two things: we replace
705         # everybody's crypttext hashtree with a bad version (computed over
706         # bogus data), and we modify the supposedly-validated uri_extension
707         # block to match the new crypttext hashtree root. The download
708         # process should notice that the crypttext coming out of FEC doesn't
709         # match the tree, and fail.
710
711         modemap = dict([(i, "good") for i in range(0, 10)])
712         d = self.send_and_recover((4,8,10), bucket_modes=modemap,
713                                   recover_mode=("corrupt_crypttext_hashes"))
714         def _done(res):
715             self.failUnless(isinstance(res, Failure))
716             self.failUnless(res.check(hashtree.BadHashError), res)
717         d.addBoth(_done)
718         return d
719
720     def OFF_test_bad_plaintext(self):
721         # faking a decryption failure is easier: just corrupt the key
722         modemap = dict([(i, "good") for i in range(0, 10)])
723         d = self.send_and_recover((4,8,10), bucket_modes=modemap,
724                                   recover_mode=("corrupt_key"))
725         def _done(res):
726             self.failUnless(isinstance(res, Failure))
727             self.failUnless(res.check(hashtree.BadHashError), res)
728         d.addBoth(_done)
729         return d
730
731     def test_bad_sharehashes_failure(self):
732         # all ten servers have bad share hashes, so the sharehash tree
733         # will not validate, and the download will fail
734         modemap = dict([(i, "bad sharehash")
735                         for i in range(10)])
736         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
737         def _done(res):
738             self.failUnless(isinstance(res, Failure))
739             self.failUnless(res.check(NotEnoughSharesError))
740         d.addBoth(_done)
741         return d
742
743     def test_missing_sharehashes(self):
744         # the first 6 servers are missing their sharehashes, so the
745         # sharehash tree will not validate
746         modemap = dict([(i, "missing sharehash")
747                         for i in range(6)]
748                        + [(i, "good")
749                           for i in range(6, 10)])
750         return self.send_and_recover((4,8,10), bucket_modes=modemap)
751
752     def test_missing_sharehashes_failure(self):
753         # all servers are missing their sharehashes, so the sharehash tree will not validate,
754         # and the download will fail
755         modemap = dict([(i, "missing sharehash")
756                         for i in range(10)])
757         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
758         def _done(res):
759             self.failUnless(isinstance(res, Failure), res)
760             self.failUnless(res.check(NotEnoughSharesError), res)
761         d.addBoth(_done)
762         return d
763
764     def test_lost_one_shareholder(self):
765         # we have enough shareholders when we start, but one segment in we
766         # lose one of them. The upload should still succeed, as long as we
767         # still have 'shares_of_happiness' peers left.
768         modemap = dict([(i, "good") for i in range(9)] +
769                        [(i, "lost") for i in range(9, 10)])
770         return self.send_and_recover((4,8,10), bucket_modes=modemap)
771
772     def test_lost_one_shareholder_early(self):
773         # we have enough shareholders when we choose peers, but just before
774         # we send the 'start' message, we lose one of them. The upload should
775         # still succeed, as long as we still have 'shares_of_happiness' peers
776         # left.
777         modemap = dict([(i, "good") for i in range(9)] +
778                        [(i, "lost-early") for i in range(9, 10)])
779         return self.send_and_recover((4,8,10), bucket_modes=modemap)
780
781     def test_lost_many_shareholders(self):
782         # we have enough shareholders when we start, but one segment in we
783         # lose all but one of them. The upload should fail.
784         modemap = dict([(i, "good") for i in range(1)] +
785                        [(i, "lost") for i in range(1, 10)])
786         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
787         def _done(res):
788             self.failUnless(isinstance(res, Failure))
789             self.failUnless(res.check(NotEnoughSharesError), res)
790         d.addBoth(_done)
791         return d
792
793     def test_lost_all_shareholders(self):
794         # we have enough shareholders when we start, but one segment in we
795         # lose all of them. The upload should fail.
796         modemap = dict([(i, "lost") for i in range(10)])
797         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
798         def _done(res):
799             self.failUnless(isinstance(res, Failure))
800             self.failUnless(res.check(NotEnoughSharesError))
801         d.addBoth(_done)
802         return d