]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_encode.py
Alter tests to use the new form of set_shareholders
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_encode.py
1 from zope.interface import implements
2 from twisted.trial import unittest
3 from twisted.internet import defer, reactor
4 from twisted.python.failure import Failure
5 from foolscap.api import fireEventually
6 from allmydata import hashtree, uri
7 from allmydata.immutable import encode, upload, download
8 from allmydata.util import hashutil
9 from allmydata.util.assertutil import _assert
10 from allmydata.util.consumer import MemoryConsumer
11 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
12      NotEnoughSharesError, IStorageBroker
13 from allmydata.monitor import Monitor
14 import common_util as testutil
15
16 class LostPeerError(Exception):
17     pass
18
19 def flip_bit(good): # flips the last bit
20     return good[:-1] + chr(ord(good[-1]) ^ 0x01)
21
22 class FakeStorageBroker:
23     implements(IStorageBroker)
24
25 class FakeBucketReaderWriterProxy:
26     implements(IStorageBucketWriter, IStorageBucketReader)
27     # these are used for both reading and writing
28     def __init__(self, mode="good"):
29         self.mode = mode
30         self.blocks = {}
31         self.plaintext_hashes = []
32         self.crypttext_hashes = []
33         self.block_hashes = None
34         self.share_hashes = None
35         self.closed = False
36
37     def get_peerid(self):
38         return "peerid"
39
40     def _start(self):
41         if self.mode == "lost-early":
42             f = Failure(LostPeerError("I went away early"))
43             return fireEventually(f)
44         return defer.succeed(self)
45
46     def put_header(self):
47         return self._start()
48
49     def put_block(self, segmentnum, data):
50         if self.mode == "lost-early":
51             f = Failure(LostPeerError("I went away early"))
52             return fireEventually(f)
53         def _try():
54             assert not self.closed
55             assert segmentnum not in self.blocks
56             if self.mode == "lost" and segmentnum >= 1:
57                 raise LostPeerError("I'm going away now")
58             self.blocks[segmentnum] = data
59         return defer.maybeDeferred(_try)
60
61     def put_plaintext_hashes(self, hashes):
62         def _try():
63             assert not self.closed
64             assert not self.plaintext_hashes
65             self.plaintext_hashes = hashes
66         return defer.maybeDeferred(_try)
67
68     def put_crypttext_hashes(self, hashes):
69         def _try():
70             assert not self.closed
71             assert not self.crypttext_hashes
72             self.crypttext_hashes = hashes
73         return defer.maybeDeferred(_try)
74
75     def put_block_hashes(self, blockhashes):
76         def _try():
77             assert not self.closed
78             assert self.block_hashes is None
79             self.block_hashes = blockhashes
80         return defer.maybeDeferred(_try)
81
82     def put_share_hashes(self, sharehashes):
83         def _try():
84             assert not self.closed
85             assert self.share_hashes is None
86             self.share_hashes = sharehashes
87         return defer.maybeDeferred(_try)
88
89     def put_uri_extension(self, uri_extension):
90         def _try():
91             assert not self.closed
92             self.uri_extension = uri_extension
93         return defer.maybeDeferred(_try)
94
95     def close(self):
96         def _try():
97             assert not self.closed
98             self.closed = True
99         return defer.maybeDeferred(_try)
100
101     def abort(self):
102         return defer.succeed(None)
103
104     def get_block_data(self, blocknum, blocksize, size):
105         d = self._start()
106         def _try(unused=None):
107             assert isinstance(blocknum, (int, long))
108             if self.mode == "bad block":
109                 return flip_bit(self.blocks[blocknum])
110             return self.blocks[blocknum]
111         d.addCallback(_try)
112         return d
113
114     def get_plaintext_hashes(self):
115         d = self._start()
116         def _try(unused=None):
117             hashes = self.plaintext_hashes[:]
118             return hashes
119         d.addCallback(_try)
120         return d
121
122     def get_crypttext_hashes(self):
123         d = self._start()
124         def _try(unused=None):
125             hashes = self.crypttext_hashes[:]
126             if self.mode == "bad crypttext hashroot":
127                 hashes[0] = flip_bit(hashes[0])
128             if self.mode == "bad crypttext hash":
129                 hashes[1] = flip_bit(hashes[1])
130             return hashes
131         d.addCallback(_try)
132         return d
133
134     def get_block_hashes(self, at_least_these=()):
135         d = self._start()
136         def _try(unused=None):
137             if self.mode == "bad blockhash":
138                 hashes = self.block_hashes[:]
139                 hashes[1] = flip_bit(hashes[1])
140                 return hashes
141             return self.block_hashes
142         d.addCallback(_try)
143         return d
144
145     def get_share_hashes(self, at_least_these=()):
146         d = self._start()
147         def _try(unused=None):
148             if self.mode == "bad sharehash":
149                 hashes = self.share_hashes[:]
150                 hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
151                 return hashes
152             if self.mode == "missing sharehash":
153                 # one sneaky attack would be to pretend we don't know our own
154                 # sharehash, which could manage to frame someone else.
155                 # download.py is supposed to guard against this case.
156                 return []
157             return self.share_hashes
158         d.addCallback(_try)
159         return d
160
161     def get_uri_extension(self):
162         d = self._start()
163         def _try(unused=None):
164             if self.mode == "bad uri_extension":
165                 return flip_bit(self.uri_extension)
166             return self.uri_extension
167         d.addCallback(_try)
168         return d
169
170
171 def make_data(length):
172     data = "happy happy joy joy" * 100
173     assert length <= len(data)
174     return data[:length]
175
176 class ValidatedExtendedURIProxy(unittest.TestCase):
177     timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
178     K = 4
179     M = 10
180     SIZE = 200
181     SEGSIZE = 72
182     _TMP = SIZE%SEGSIZE
183     if _TMP == 0:
184         _TMP = SEGSIZE
185     if _TMP % K != 0:
186         _TMP += (K - (_TMP % K))
187     TAIL_SEGSIZE = _TMP
188     _TMP = SIZE / SEGSIZE
189     if SIZE % SEGSIZE != 0:
190         _TMP += 1
191     NUM_SEGMENTS = _TMP
192     mindict = { 'segment_size': SEGSIZE,
193                 'crypttext_root_hash': '0'*hashutil.CRYPTO_VAL_SIZE,
194                 'share_root_hash': '1'*hashutil.CRYPTO_VAL_SIZE }
195     optional_consistent = { 'crypttext_hash': '2'*hashutil.CRYPTO_VAL_SIZE,
196                             'codec_name': "crs",
197                             'codec_params': "%d-%d-%d" % (SEGSIZE, K, M),
198                             'tail_codec_params': "%d-%d-%d" % (TAIL_SEGSIZE, K, M),
199                             'num_segments': NUM_SEGMENTS,
200                             'size': SIZE,
201                             'needed_shares': K,
202                             'total_shares': M,
203                             'plaintext_hash': "anything",
204                             'plaintext_root_hash': "anything", }
205     # optional_inconsistent = { 'crypttext_hash': ('2'*(hashutil.CRYPTO_VAL_SIZE-1), "", 77),
206     optional_inconsistent = { 'crypttext_hash': (77,),
207                               'codec_name': ("digital fountain", ""),
208                               'codec_params': ("%d-%d-%d" % (SEGSIZE, K-1, M),
209                                                "%d-%d-%d" % (SEGSIZE-1, K, M),
210                                                "%d-%d-%d" % (SEGSIZE, K, M-1)),
211                               'tail_codec_params': ("%d-%d-%d" % (TAIL_SEGSIZE, K-1, M),
212                                                "%d-%d-%d" % (TAIL_SEGSIZE-1, K, M),
213                                                "%d-%d-%d" % (TAIL_SEGSIZE, K, M-1)),
214                               'num_segments': (NUM_SEGMENTS-1,),
215                               'size': (SIZE-1,),
216                               'needed_shares': (K-1,),
217                               'total_shares': (M-1,), }
218
219     def _test(self, uebdict):
220         uebstring = uri.pack_extension(uebdict)
221         uebhash = hashutil.uri_extension_hash(uebstring)
222         fb = FakeBucketReaderWriterProxy()
223         fb.put_uri_extension(uebstring)
224         verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE)
225         vup = download.ValidatedExtendedURIProxy(fb, verifycap)
226         return vup.start()
227
228     def _test_accept(self, uebdict):
229         return self._test(uebdict)
230
231     def _should_fail(self, res, expected_failures):
232         if isinstance(res, Failure):
233             res.trap(*expected_failures)
234         else:
235             self.fail("was supposed to raise %s, not get '%s'" % (expected_failures, res))
236
237     def _test_reject(self, uebdict):
238         d = self._test(uebdict)
239         d.addBoth(self._should_fail, (KeyError, download.BadURIExtension))
240         return d
241
242     def test_accept_minimal(self):
243         return self._test_accept(self.mindict)
244
245     def test_reject_insufficient(self):
246         dl = []
247         for k in self.mindict.iterkeys():
248             insuffdict = self.mindict.copy()
249             del insuffdict[k]
250             d = self._test_reject(insuffdict)
251         dl.append(d)
252         return defer.DeferredList(dl)
253
254     def test_accept_optional(self):
255         dl = []
256         for k in self.optional_consistent.iterkeys():
257             mydict = self.mindict.copy()
258             mydict[k] = self.optional_consistent[k]
259             d = self._test_accept(mydict)
260         dl.append(d)
261         return defer.DeferredList(dl)
262
263     def test_reject_optional(self):
264         dl = []
265         for k in self.optional_inconsistent.iterkeys():
266             for v in self.optional_inconsistent[k]:
267                 mydict = self.mindict.copy()
268                 mydict[k] = v
269                 d = self._test_reject(mydict)
270                 dl.append(d)
271         return defer.DeferredList(dl)
272
273 class Encode(unittest.TestCase):
274     timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
275
276     def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
277                   expected_block_hashes, expected_share_hashes):
278         data = make_data(datalen)
279         # force use of multiple segments
280         e = encode.Encoder()
281         u = upload.Data(data, convergence="some convergence string")
282         u.max_segment_size = max_segment_size
283         u.encoding_param_k = 25
284         u.encoding_param_happy = 75
285         u.encoding_param_n = 100
286         eu = upload.EncryptAnUploadable(u)
287         d = e.set_encrypted_uploadable(eu)
288
289         all_shareholders = []
290         def _ready(res):
291             k,happy,n = e.get_param("share_counts")
292             _assert(n == NUM_SHARES) # else we'll be completely confused
293             numsegs = e.get_param("num_segments")
294             _assert(numsegs == NUM_SEGMENTS, numsegs, NUM_SEGMENTS)
295             segsize = e.get_param("segment_size")
296             _assert( (NUM_SEGMENTS-1)*segsize < len(data) <= NUM_SEGMENTS*segsize,
297                      NUM_SEGMENTS, segsize,
298                      (NUM_SEGMENTS-1)*segsize, len(data), NUM_SEGMENTS*segsize)
299
300             shareholders = {}
301             servermap = {}
302             for shnum in range(NUM_SHARES):
303                 peer = FakeBucketReaderWriterProxy()
304                 shareholders[shnum] = peer
305                 servermap[shnum] = str(shnum)
306                 all_shareholders.append(peer)
307             e.set_shareholders(shareholders, servermap)
308             return e.start()
309         d.addCallback(_ready)
310
311         def _check(res):
312             verifycap = res
313             self.failUnless(isinstance(verifycap.uri_extension_hash, str))
314             self.failUnlessEqual(len(verifycap.uri_extension_hash), 32)
315             for i,peer in enumerate(all_shareholders):
316                 self.failUnless(peer.closed)
317                 self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
318                 # each peer gets a full tree of block hashes. For 3 or 4
319                 # segments, that's 7 hashes. For 5 segments it's 15 hashes.
320                 self.failUnlessEqual(len(peer.block_hashes),
321                                      expected_block_hashes)
322                 for h in peer.block_hashes:
323                     self.failUnlessEqual(len(h), 32)
324                 # each peer also gets their necessary chain of share hashes.
325                 # For 100 shares (rounded up to 128 leaves), that's 8 hashes
326                 self.failUnlessEqual(len(peer.share_hashes),
327                                      expected_share_hashes)
328                 for (hashnum, h) in peer.share_hashes:
329                     self.failUnless(isinstance(hashnum, int))
330                     self.failUnlessEqual(len(h), 32)
331         d.addCallback(_check)
332
333         return d
334
335     # a series of 3*3 tests to check out edge conditions. One axis is how the
336     # plaintext is divided into segments: kn+(-1,0,1). Another way to express
337     # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we
338     # might test 74 bytes, 75 bytes, and 76 bytes.
339
340     # on the other axis is how many leaves in the block hash tree we wind up
341     # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
342     # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
343     # segments, and 5 segments.
344
345     # that results in the following series of data lengths:
346     #  3 segs: 74, 75, 51
347     #  4 segs: 99, 100, 76
348     #  5 segs: 124, 125, 101
349
350     # all tests encode to 100 shares, which means the share hash tree will
351     # have 128 leaves, which means that buckets will be given an 8-long share
352     # hash chain
353
354     # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
355     # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
356     # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
357     # trees, which get 15 blockhashes.
358
359     def test_send_74(self):
360         # 3 segments (25, 25, 24)
361         return self.do_encode(25, 74, 100, 3, 7, 8)
362     def test_send_75(self):
363         # 3 segments (25, 25, 25)
364         return self.do_encode(25, 75, 100, 3, 7, 8)
365     def test_send_51(self):
366         # 3 segments (25, 25, 1)
367         return self.do_encode(25, 51, 100, 3, 7, 8)
368
369     def test_send_76(self):
370         # encode a 76 byte file (in 4 segments: 25,25,25,1) to 100 shares
371         return self.do_encode(25, 76, 100, 4, 7, 8)
372     def test_send_99(self):
373         # 4 segments: 25,25,25,24
374         return self.do_encode(25, 99, 100, 4, 7, 8)
375     def test_send_100(self):
376         # 4 segments: 25,25,25,25
377         return self.do_encode(25, 100, 100, 4, 7, 8)
378
379     def test_send_124(self):
380         # 5 segments: 25, 25, 25, 25, 24
381         return self.do_encode(25, 124, 100, 5, 15, 8)
382     def test_send_125(self):
383         # 5 segments: 25, 25, 25, 25, 25
384         return self.do_encode(25, 125, 100, 5, 15, 8)
385     def test_send_101(self):
386         # 5 segments: 25, 25, 25, 25, 1
387         return self.do_encode(25, 101, 100, 5, 15, 8)
388
389 class PausingConsumer(MemoryConsumer):
390     def __init__(self):
391         MemoryConsumer.__init__(self)
392         self.size = 0
393         self.writes = 0
394     def write(self, data):
395         self.size += len(data)
396         self.writes += 1
397         if self.writes <= 2:
398             # we happen to use 4 segments, and want to avoid pausing on the
399             # last one (since then the _unpause timer will still be running)
400             self.producer.pauseProducing()
401             reactor.callLater(0.1, self._unpause)
402         return MemoryConsumer.write(self, data)
403     def _unpause(self):
404         self.producer.resumeProducing()
405
406 class PausingAndStoppingConsumer(PausingConsumer):
407     def write(self, data):
408         self.producer.pauseProducing()
409         reactor.callLater(0.5, self._stop)
410     def _stop(self):
411         self.producer.stopProducing()
412
413 class StoppingConsumer(PausingConsumer):
414     def write(self, data):
415         self.producer.stopProducing()
416
417 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
418     timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
419     def send_and_recover(self, k_and_happy_and_n=(25,75,100),
420                          AVAILABLE_SHARES=None,
421                          datalen=76,
422                          max_segment_size=25,
423                          bucket_modes={},
424                          recover_mode="recover",
425                          consumer=None,
426                          ):
427         if AVAILABLE_SHARES is None:
428             AVAILABLE_SHARES = k_and_happy_and_n[2]
429         data = make_data(datalen)
430         d = self.send(k_and_happy_and_n, AVAILABLE_SHARES,
431                       max_segment_size, bucket_modes, data)
432         # that fires with (uri_extension_hash, e, shareholders)
433         d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode,
434                       consumer=consumer)
435         # that fires with newdata
436         def _downloaded((newdata, fd)):
437             self.failUnless(newdata == data, str((len(newdata), len(data))))
438             return fd
439         d.addCallback(_downloaded)
440         return d
441
442     def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,
443              bucket_modes, data):
444         k, happy, n = k_and_happy_and_n
445         NUM_SHARES = k_and_happy_and_n[2]
446         if AVAILABLE_SHARES is None:
447             AVAILABLE_SHARES = NUM_SHARES
448         e = encode.Encoder()
449         u = upload.Data(data, convergence="some convergence string")
450         # force use of multiple segments by using a low max_segment_size
451         u.max_segment_size = max_segment_size
452         u.encoding_param_k = k
453         u.encoding_param_happy = happy
454         u.encoding_param_n = n
455         eu = upload.EncryptAnUploadable(u)
456         d = e.set_encrypted_uploadable(eu)
457
458         shareholders = {}
459         def _ready(res):
460             k,happy,n = e.get_param("share_counts")
461             assert n == NUM_SHARES # else we'll be completely confused
462             all_peers = []
463             for shnum in range(NUM_SHARES):
464                 mode = bucket_modes.get(shnum, "good")
465                 peer = FakeBucketReaderWriterProxy(mode)
466                 shareholders[shnum] = peer
467                 servermap[shnum] = str(shnum)
468             e.set_shareholders(shareholders, servermap)
469             return e.start()
470         d.addCallback(_ready)
471         def _sent(res):
472             d1 = u.get_encryption_key()
473             d1.addCallback(lambda key: (res, key, shareholders))
474             return d1
475         d.addCallback(_sent)
476         return d
477
478     def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
479                 recover_mode, consumer=None):
480         verifycap = res
481
482         if "corrupt_key" in recover_mode:
483             # we corrupt the key, so that the decrypted data is corrupted and
484             # will fail the plaintext hash check. Since we're manually
485             # attaching shareholders, the fact that the storage index is also
486             # corrupted doesn't matter.
487             key = flip_bit(key)
488
489         u = uri.CHKFileURI(key=key,
490                            uri_extension_hash=verifycap.uri_extension_hash,
491                            needed_shares=verifycap.needed_shares,
492                            total_shares=verifycap.total_shares,
493                            size=verifycap.size)
494
495         sb = FakeStorageBroker()
496         if not consumer:
497             consumer = MemoryConsumer()
498         innertarget = download.ConsumerAdapter(consumer)
499         target = download.DecryptingTarget(innertarget, u.key)
500         fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor())
501
502         # we manually cycle the CiphertextDownloader through a number of steps that
503         # would normally be sequenced by a Deferred chain in
504         # CiphertextDownloader.start(), to give us more control over the process.
505         # In particular, by bypassing _get_all_shareholders, we skip
506         # permuted-peerlist selection.
507         for shnum, bucket in shareholders.items():
508             if shnum < AVAILABLE_SHARES and bucket.closed:
509                 fd.add_share_bucket(shnum, bucket)
510         fd._got_all_shareholders(None)
511
512         # Make it possible to obtain uri_extension from the shareholders.
513         # Arrange for shareholders[0] to be the first, so we can selectively
514         # corrupt the data it returns.
515         uri_extension_sources = shareholders.values()
516         uri_extension_sources.remove(shareholders[0])
517         uri_extension_sources.insert(0, shareholders[0])
518
519         d = defer.succeed(None)
520
521         # have the CiphertextDownloader retrieve a copy of uri_extension itself
522         d.addCallback(fd._obtain_uri_extension)
523
524         if "corrupt_crypttext_hashes" in recover_mode:
525             # replace everybody's crypttext hash trees with a different one
526             # (computed over a different file), then modify our uri_extension
527             # to reflect the new crypttext hash tree root
528             def _corrupt_crypttext_hashes(unused):
529                 assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup
530                 assert fd._vup.crypttext_root_hash, fd._vup
531                 badhash = hashutil.tagged_hash("bogus", "data")
532                 bad_crypttext_hashes = [badhash] * fd._vup.num_segments
533                 badtree = hashtree.HashTree(bad_crypttext_hashes)
534                 for bucket in shareholders.values():
535                     bucket.crypttext_hashes = list(badtree)
536                 fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments)
537                 fd._crypttext_hash_tree.set_hashes({0: badtree[0]})
538                 return fd._vup
539             d.addCallback(_corrupt_crypttext_hashes)
540
541         # also have the CiphertextDownloader ask for hash trees
542         d.addCallback(fd._get_crypttext_hash_tree)
543
544         d.addCallback(fd._download_all_segments)
545         d.addCallback(fd._done)
546         def _done(t):
547             newdata = "".join(consumer.chunks)
548             return (newdata, fd)
549         d.addCallback(_done)
550         return d
551
552     def test_not_enough_shares(self):
553         d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
554         def _done(res):
555             self.failUnless(isinstance(res, Failure))
556             self.failUnless(res.check(NotEnoughSharesError))
557         d.addBoth(_done)
558         return d
559
560     def test_one_share_per_peer(self):
561         return self.send_and_recover()
562
563     def test_74(self):
564         return self.send_and_recover(datalen=74)
565     def test_75(self):
566         return self.send_and_recover(datalen=75)
567     def test_51(self):
568         return self.send_and_recover(datalen=51)
569
570     def test_99(self):
571         return self.send_and_recover(datalen=99)
572     def test_100(self):
573         return self.send_and_recover(datalen=100)
574     def test_76(self):
575         return self.send_and_recover(datalen=76)
576
577     def test_124(self):
578         return self.send_and_recover(datalen=124)
579     def test_125(self):
580         return self.send_and_recover(datalen=125)
581     def test_101(self):
582         return self.send_and_recover(datalen=101)
583
584     def test_pause(self):
585         # use a download target that does pauseProducing/resumeProducing a
586         # few times, then finishes
587         c = PausingConsumer()
588         d = self.send_and_recover(consumer=c)
589         return d
590
591     def test_pause_then_stop(self):
592         # use a download target that pauses, then stops.
593         c = PausingAndStoppingConsumer()
594         d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop",
595                             "our Consumer called stopProducing()",
596                             self.send_and_recover, consumer=c)
597         return d
598
599     def test_stop(self):
600         # use a download targetthat does an immediate stop (ticket #473)
601         c = StoppingConsumer()
602         d = self.shouldFail(download.DownloadStopped, "test_stop",
603                             "our Consumer called stopProducing()",
604                             self.send_and_recover, consumer=c)
605         return d
606
607     # the following tests all use 4-out-of-10 encoding
608
609     def test_bad_blocks(self):
610         # the first 6 servers have bad blocks, which will be caught by the
611         # blockhashes
612         modemap = dict([(i, "bad block")
613                         for i in range(6)]
614                        + [(i, "good")
615                           for i in range(6, 10)])
616         return self.send_and_recover((4,8,10), bucket_modes=modemap)
617
618     def test_bad_blocks_failure(self):
619         # the first 7 servers have bad blocks, which will be caught by the
620         # blockhashes, and the download will fail
621         modemap = dict([(i, "bad block")
622                         for i in range(7)]
623                        + [(i, "good")
624                           for i in range(7, 10)])
625         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
626         def _done(res):
627             self.failUnless(isinstance(res, Failure), res)
628             self.failUnless(res.check(NotEnoughSharesError), res)
629         d.addBoth(_done)
630         return d
631
632     def test_bad_blockhashes(self):
633         # the first 6 servers have bad block hashes, so the blockhash tree
634         # will not validate
635         modemap = dict([(i, "bad blockhash")
636                         for i in range(6)]
637                        + [(i, "good")
638                           for i in range(6, 10)])
639         return self.send_and_recover((4,8,10), bucket_modes=modemap)
640
641     def test_bad_blockhashes_failure(self):
642         # the first 7 servers have bad block hashes, so the blockhash tree
643         # will not validate, and the download will fail
644         modemap = dict([(i, "bad blockhash")
645                         for i in range(7)]
646                        + [(i, "good")
647                           for i in range(7, 10)])
648         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
649         def _done(res):
650             self.failUnless(isinstance(res, Failure))
651             self.failUnless(res.check(NotEnoughSharesError), res)
652         d.addBoth(_done)
653         return d
654
655     def test_bad_sharehashes(self):
656         # the first 6 servers have bad block hashes, so the sharehash tree
657         # will not validate
658         modemap = dict([(i, "bad sharehash")
659                         for i in range(6)]
660                        + [(i, "good")
661                           for i in range(6, 10)])
662         return self.send_and_recover((4,8,10), bucket_modes=modemap)
663
664     def assertFetchFailureIn(self, fd, where):
665         expected = {"uri_extension": 0,
666                     "crypttext_hash_tree": 0,
667                     }
668         if where is not None:
669             expected[where] += 1
670         self.failUnlessEqual(fd._fetch_failures, expected)
671
672     def test_good(self):
673         # just to make sure the test harness works when we aren't
674         # intentionally causing failures
675         modemap = dict([(i, "good") for i in range(0, 10)])
676         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
677         d.addCallback(self.assertFetchFailureIn, None)
678         return d
679
680     def test_bad_uri_extension(self):
681         # the first server has a bad uri_extension block, so we will fail
682         # over to a different server.
683         modemap = dict([(i, "bad uri_extension") for i in range(1)] +
684                        [(i, "good") for i in range(1, 10)])
685         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
686         d.addCallback(self.assertFetchFailureIn, "uri_extension")
687         return d
688
689     def test_bad_crypttext_hashroot(self):
690         # the first server has a bad crypttext hashroot, so we will fail
691         # over to a different server.
692         modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
693                        [(i, "good") for i in range(1, 10)])
694         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
695         d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
696         return d
697
698     def test_bad_crypttext_hashes(self):
699         # the first server has a bad crypttext hash block, so we will fail
700         # over to a different server.
701         modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
702                        [(i, "good") for i in range(1, 10)])
703         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
704         d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
705         return d
706
707     def test_bad_crypttext_hashes_failure(self):
708         # to test that the crypttext merkle tree is really being applied, we
709         # sneak into the download process and corrupt two things: we replace
710         # everybody's crypttext hashtree with a bad version (computed over
711         # bogus data), and we modify the supposedly-validated uri_extension
712         # block to match the new crypttext hashtree root. The download
713         # process should notice that the crypttext coming out of FEC doesn't
714         # match the tree, and fail.
715
716         modemap = dict([(i, "good") for i in range(0, 10)])
717         d = self.send_and_recover((4,8,10), bucket_modes=modemap,
718                                   recover_mode=("corrupt_crypttext_hashes"))
719         def _done(res):
720             self.failUnless(isinstance(res, Failure))
721             self.failUnless(res.check(hashtree.BadHashError), res)
722         d.addBoth(_done)
723         return d
724
725     def OFF_test_bad_plaintext(self):
726         # faking a decryption failure is easier: just corrupt the key
727         modemap = dict([(i, "good") for i in range(0, 10)])
728         d = self.send_and_recover((4,8,10), bucket_modes=modemap,
729                                   recover_mode=("corrupt_key"))
730         def _done(res):
731             self.failUnless(isinstance(res, Failure))
732             self.failUnless(res.check(hashtree.BadHashError), res)
733         d.addBoth(_done)
734         return d
735
736     def test_bad_sharehashes_failure(self):
737         # all ten servers have bad share hashes, so the sharehash tree
738         # will not validate, and the download will fail
739         modemap = dict([(i, "bad sharehash")
740                         for i in range(10)])
741         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
742         def _done(res):
743             self.failUnless(isinstance(res, Failure))
744             self.failUnless(res.check(NotEnoughSharesError))
745         d.addBoth(_done)
746         return d
747
748     def test_missing_sharehashes(self):
749         # the first 6 servers are missing their sharehashes, so the
750         # sharehash tree will not validate
751         modemap = dict([(i, "missing sharehash")
752                         for i in range(6)]
753                        + [(i, "good")
754                           for i in range(6, 10)])
755         return self.send_and_recover((4,8,10), bucket_modes=modemap)
756
757     def test_missing_sharehashes_failure(self):
758         # all servers are missing their sharehashes, so the sharehash tree will not validate,
759         # and the download will fail
760         modemap = dict([(i, "missing sharehash")
761                         for i in range(10)])
762         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
763         def _done(res):
764             self.failUnless(isinstance(res, Failure), res)
765             self.failUnless(res.check(NotEnoughSharesError), res)
766         d.addBoth(_done)
767         return d
768
769     def test_lost_one_shareholder(self):
770         # we have enough shareholders when we start, but one segment in we
771         # lose one of them. The upload should still succeed, as long as we
772         # still have 'shares_of_happiness' peers left.
773         modemap = dict([(i, "good") for i in range(9)] +
774                        [(i, "lost") for i in range(9, 10)])
775         return self.send_and_recover((4,8,10), bucket_modes=modemap)
776
777     def test_lost_one_shareholder_early(self):
778         # we have enough shareholders when we choose peers, but just before
779         # we send the 'start' message, we lose one of them. The upload should
780         # still succeed, as long as we still have 'shares_of_happiness' peers
781         # left.
782         modemap = dict([(i, "good") for i in range(9)] +
783                        [(i, "lost-early") for i in range(9, 10)])
784         return self.send_and_recover((4,8,10), bucket_modes=modemap)
785
786     def test_lost_many_shareholders(self):
787         # we have enough shareholders when we start, but one segment in we
788         # lose all but one of them. The upload should fail.
789         modemap = dict([(i, "good") for i in range(1)] +
790                        [(i, "lost") for i in range(1, 10)])
791         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
792         def _done(res):
793             self.failUnless(isinstance(res, Failure))
794             self.failUnless(res.check(NotEnoughSharesError), res)
795         d.addBoth(_done)
796         return d
797
798     def test_lost_all_shareholders(self):
799         # we have enough shareholders when we start, but one segment in we
800         # lose all of them. The upload should fail.
801         modemap = dict([(i, "lost") for i in range(10)])
802         d = self.send_and_recover((4,8,10), bucket_modes=modemap)
803         def _done(res):
804             self.failUnless(isinstance(res, Failure))
805             self.failUnless(res.check(NotEnoughSharesError))
806         d.addBoth(_done)
807         return d