1 from zope.interface import implements
2 from twisted.trial import unittest
3 from twisted.internet import defer, reactor
4 from twisted.internet.interfaces import IConsumer
5 from twisted.python.failure import Failure
6 from foolscap import eventual
7 from allmydata import hashtree, uri
8 from allmydata.immutable import encode, upload, download
9 from allmydata.util import hashutil
10 from allmydata.util.assertutil import _assert
11 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
12 import common_util as testutil
14 class LostPeerError(Exception):
17 def flip_bit(good): # flips the last bit
18 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
21 def log(self, *args, **kwargs):
24 class FakeBucketReaderWriterProxy:
25 implements(IStorageBucketWriter, IStorageBucketReader)
26 # these are used for both reading and writing
27 def __init__(self, mode="good"):
30 self.plaintext_hashes = []
31 self.crypttext_hashes = []
32 self.block_hashes = None
33 self.share_hashes = None
40 if self.mode == "lost-early":
41 f = Failure(LostPeerError("I went away early"))
42 return eventual.fireEventually(f)
43 return defer.succeed(self)
48 def put_block(self, segmentnum, data):
49 if self.mode == "lost-early":
50 f = Failure(LostPeerError("I went away early"))
51 return eventual.fireEventually(f)
53 assert not self.closed
54 assert segmentnum not in self.blocks
55 if self.mode == "lost" and segmentnum >= 1:
56 raise LostPeerError("I'm going away now")
57 self.blocks[segmentnum] = data
58 return defer.maybeDeferred(_try)
60 def put_plaintext_hashes(self, hashes):
62 assert not self.closed
63 assert not self.plaintext_hashes
64 self.plaintext_hashes = hashes
65 return defer.maybeDeferred(_try)
67 def put_crypttext_hashes(self, hashes):
69 assert not self.closed
70 assert not self.crypttext_hashes
71 self.crypttext_hashes = hashes
72 return defer.maybeDeferred(_try)
74 def put_block_hashes(self, blockhashes):
76 assert not self.closed
77 assert self.block_hashes is None
78 self.block_hashes = blockhashes
79 return defer.maybeDeferred(_try)
81 def put_share_hashes(self, sharehashes):
83 assert not self.closed
84 assert self.share_hashes is None
85 self.share_hashes = sharehashes
86 return defer.maybeDeferred(_try)
88 def put_uri_extension(self, uri_extension):
90 assert not self.closed
91 self.uri_extension = uri_extension
92 return defer.maybeDeferred(_try)
96 assert not self.closed
98 return defer.maybeDeferred(_try)
101 return defer.succeed(None)
103 def get_block_data(self, blocknum, blocksize, size):
105 def _try(unused=None):
106 assert isinstance(blocknum, (int, long))
107 if self.mode == "bad block":
108 return flip_bit(self.blocks[blocknum])
109 return self.blocks[blocknum]
113 def get_plaintext_hashes(self):
115 def _try(unused=None):
116 hashes = self.plaintext_hashes[:]
121 def get_crypttext_hashes(self):
123 def _try(unused=None):
124 hashes = self.crypttext_hashes[:]
125 if self.mode == "bad crypttext hashroot":
126 hashes[0] = flip_bit(hashes[0])
127 if self.mode == "bad crypttext hash":
128 hashes[1] = flip_bit(hashes[1])
133 def get_block_hashes(self, at_least_these=()):
135 def _try(unused=None):
136 if self.mode == "bad blockhash":
137 hashes = self.block_hashes[:]
138 hashes[1] = flip_bit(hashes[1])
140 return self.block_hashes
144 def get_share_hashes(self, at_least_these=()):
146 def _try(unused=None):
147 if self.mode == "bad sharehash":
148 hashes = self.share_hashes[:]
149 hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
151 if self.mode == "missing sharehash":
152 # one sneaky attack would be to pretend we don't know our own
153 # sharehash, which could manage to frame someone else.
154 # download.py is supposed to guard against this case.
156 return self.share_hashes
160 def get_uri_extension(self):
162 def _try(unused=None):
163 if self.mode == "bad uri_extension":
164 return flip_bit(self.uri_extension)
165 return self.uri_extension
170 def make_data(length):
171 data = "happy happy joy joy" * 100
172 assert length <= len(data)
175 class ValidatedExtendedURIProxy(unittest.TestCase):
184 _TMP += (K - (_TMP % K))
186 _TMP = SIZE / SEGSIZE
187 if SIZE % SEGSIZE != 0:
190 mindict = { 'segment_size': SEGSIZE,
191 'crypttext_root_hash': '0'*hashutil.CRYPTO_VAL_SIZE,
192 'share_root_hash': '1'*hashutil.CRYPTO_VAL_SIZE }
193 optional_consistent = { 'crypttext_hash': '2'*hashutil.CRYPTO_VAL_SIZE,
195 'codec_params': "%d-%d-%d" % (SEGSIZE, K, M),
196 'tail_codec_params': "%d-%d-%d" % (TAIL_SEGSIZE, K, M),
197 'num_segments': NUM_SEGMENTS,
201 'plaintext_hash': "anything",
202 'plaintext_root_hash': "anything", }
203 # optional_inconsistent = { 'crypttext_hash': ('2'*(hashutil.CRYPTO_VAL_SIZE-1), "", 77),
204 optional_inconsistent = { 'crypttext_hash': (77,),
205 'codec_name': ("digital fountain", ""),
206 'codec_params': ("%d-%d-%d" % (SEGSIZE, K-1, M),
207 "%d-%d-%d" % (SEGSIZE-1, K, M),
208 "%d-%d-%d" % (SEGSIZE, K, M-1)),
209 'tail_codec_params': ("%d-%d-%d" % (TAIL_SEGSIZE, K-1, M),
210 "%d-%d-%d" % (TAIL_SEGSIZE-1, K, M),
211 "%d-%d-%d" % (TAIL_SEGSIZE, K, M-1)),
212 'num_segments': (NUM_SEGMENTS-1,),
214 'needed_shares': (K-1,),
215 'total_shares': (M-1,), }
217 def _test(self, uebdict):
218 uebstring = uri.pack_extension(uebdict)
219 uebhash = hashutil.uri_extension_hash(uebstring)
220 fb = FakeBucketReaderWriterProxy()
221 fb.put_uri_extension(uebstring)
222 verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE)
223 vup = download.ValidatedExtendedURIProxy(fb, verifycap)
226 def _test_accept(self, uebdict):
227 return self._test(uebdict)
229 def _should_fail(self, res, expected_failures):
230 if isinstance(res, Failure):
231 res.trap(*expected_failures)
233 self.fail("was supposed to raise %s, not get '%s'" % (expected_failures, res))
235 def _test_reject(self, uebdict):
236 d = self._test(uebdict)
237 d.addBoth(self._should_fail, (KeyError, download.BadURIExtension))
240 def test_accept_minimal(self):
241 return self._test_accept(self.mindict)
243 def test_reject_insufficient(self):
245 for k in self.mindict.iterkeys():
246 insuffdict = self.mindict.copy()
248 d = self._test_reject(insuffdict)
250 return defer.DeferredList(dl)
252 def test_accept_optional(self):
254 for k in self.optional_consistent.iterkeys():
255 mydict = self.mindict.copy()
256 mydict[k] = self.optional_consistent[k]
257 d = self._test_accept(mydict)
259 return defer.DeferredList(dl)
261 def test_reject_optional(self):
263 for k in self.optional_inconsistent.iterkeys():
264 for v in self.optional_inconsistent[k]:
265 mydict = self.mindict.copy()
267 d = self._test_reject(mydict)
269 return defer.DeferredList(dl)
271 class Encode(unittest.TestCase):
273 def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
274 expected_block_hashes, expected_share_hashes):
275 data = make_data(datalen)
276 # force use of multiple segments
278 u = upload.Data(data, convergence="some convergence string")
279 u.max_segment_size = max_segment_size
280 u.encoding_param_k = 25
281 u.encoding_param_happy = 75
282 u.encoding_param_n = 100
283 eu = upload.EncryptAnUploadable(u)
284 d = e.set_encrypted_uploadable(eu)
286 all_shareholders = []
288 k,happy,n = e.get_param("share_counts")
289 _assert(n == NUM_SHARES) # else we'll be completely confused
290 numsegs = e.get_param("num_segments")
291 _assert(numsegs == NUM_SEGMENTS, numsegs, NUM_SEGMENTS)
292 segsize = e.get_param("segment_size")
293 _assert( (NUM_SEGMENTS-1)*segsize < len(data) <= NUM_SEGMENTS*segsize,
294 NUM_SEGMENTS, segsize,
295 (NUM_SEGMENTS-1)*segsize, len(data), NUM_SEGMENTS*segsize)
298 for shnum in range(NUM_SHARES):
299 peer = FakeBucketReaderWriterProxy()
300 shareholders[shnum] = peer
301 all_shareholders.append(peer)
302 e.set_shareholders(shareholders)
304 d.addCallback(_ready)
307 (uri_extension_hash, required_shares, num_shares, file_size) = res
308 self.failUnless(isinstance(uri_extension_hash, str))
309 self.failUnlessEqual(len(uri_extension_hash), 32)
310 for i,peer in enumerate(all_shareholders):
311 self.failUnless(peer.closed)
312 self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
313 # each peer gets a full tree of block hashes. For 3 or 4
314 # segments, that's 7 hashes. For 5 segments it's 15 hashes.
315 self.failUnlessEqual(len(peer.block_hashes),
316 expected_block_hashes)
317 for h in peer.block_hashes:
318 self.failUnlessEqual(len(h), 32)
319 # each peer also gets their necessary chain of share hashes.
320 # For 100 shares (rounded up to 128 leaves), that's 8 hashes
321 self.failUnlessEqual(len(peer.share_hashes),
322 expected_share_hashes)
323 for (hashnum, h) in peer.share_hashes:
324 self.failUnless(isinstance(hashnum, int))
325 self.failUnlessEqual(len(h), 32)
326 d.addCallback(_check)
330 # a series of 3*3 tests to check out edge conditions. One axis is how the
331 # plaintext is divided into segments: kn+(-1,0,1). Another way to express
332 # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we
333 # might test 74 bytes, 75 bytes, and 76 bytes.
335 # on the other axis is how many leaves in the block hash tree we wind up
336 # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
337 # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
338 # segments, and 5 segments.
340 # that results in the following series of data lengths:
342 # 4 segs: 99, 100, 76
343 # 5 segs: 124, 125, 101
345 # all tests encode to 100 shares, which means the share hash tree will
346 # have 128 leaves, which means that buckets will be given an 8-long share
349 # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
350 # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
351 # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
352 # trees, which get 15 blockhashes.
354 def test_send_74(self):
355 # 3 segments (25, 25, 24)
356 return self.do_encode(25, 74, 100, 3, 7, 8)
357 def test_send_75(self):
358 # 3 segments (25, 25, 25)
359 return self.do_encode(25, 75, 100, 3, 7, 8)
360 def test_send_51(self):
361 # 3 segments (25, 25, 1)
362 return self.do_encode(25, 51, 100, 3, 7, 8)
364 def test_send_76(self):
365 # encode a 76 byte file (in 4 segments: 25,25,25,1) to 100 shares
366 return self.do_encode(25, 76, 100, 4, 7, 8)
367 def test_send_99(self):
368 # 4 segments: 25,25,25,24
369 return self.do_encode(25, 99, 100, 4, 7, 8)
370 def test_send_100(self):
371 # 4 segments: 25,25,25,25
372 return self.do_encode(25, 100, 100, 4, 7, 8)
374 def test_send_124(self):
375 # 5 segments: 25, 25, 25, 25, 24
376 return self.do_encode(25, 124, 100, 5, 15, 8)
377 def test_send_125(self):
378 # 5 segments: 25, 25, 25, 25, 25
379 return self.do_encode(25, 125, 100, 5, 15, 8)
380 def test_send_101(self):
381 # 5 segments: 25, 25, 25, 25, 1
382 return self.do_encode(25, 101, 100, 5, 15, 8)
384 class PausingTarget(download.Data):
385 implements(IConsumer)
387 download.Data.__init__(self)
390 def write(self, data):
391 self.size += len(data)
394 # we happen to use 4 segments, and want to avoid pausing on the
395 # last one (since then the _unpause timer will still be running)
396 self.producer.pauseProducing()
397 reactor.callLater(0.1, self._unpause)
398 return download.Data.write(self, data)
400 self.producer.resumeProducing()
401 def registerProducer(self, producer, streaming):
402 self.producer = producer
403 def unregisterProducer(self):
406 class PausingAndStoppingTarget(PausingTarget):
407 def write(self, data):
408 self.producer.pauseProducing()
409 reactor.callLater(0.5, self._stop)
411 self.producer.stopProducing()
413 class StoppingTarget(PausingTarget):
414 def write(self, data):
415 self.producer.stopProducing()
417 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
418 def send_and_recover(self, k_and_happy_and_n=(25,75,100),
419 AVAILABLE_SHARES=None,
423 recover_mode="recover",
426 if AVAILABLE_SHARES is None:
427 AVAILABLE_SHARES = k_and_happy_and_n[2]
428 data = make_data(datalen)
429 d = self.send(k_and_happy_and_n, AVAILABLE_SHARES,
430 max_segment_size, bucket_modes, data)
431 # that fires with (uri_extension_hash, e, shareholders)
432 d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode,
434 # that fires with newdata
435 def _downloaded((newdata, fd)):
436 self.failUnless(newdata == data)
438 d.addCallback(_downloaded)
441 def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,
443 k, happy, n = k_and_happy_and_n
444 NUM_SHARES = k_and_happy_and_n[2]
445 if AVAILABLE_SHARES is None:
446 AVAILABLE_SHARES = NUM_SHARES
448 u = upload.Data(data, convergence="some convergence string")
449 # force use of multiple segments by using a low max_segment_size
450 u.max_segment_size = max_segment_size
451 u.encoding_param_k = k
452 u.encoding_param_happy = happy
453 u.encoding_param_n = n
454 eu = upload.EncryptAnUploadable(u)
455 d = e.set_encrypted_uploadable(eu)
459 k,happy,n = e.get_param("share_counts")
460 assert n == NUM_SHARES # else we'll be completely confused
462 for shnum in range(NUM_SHARES):
463 mode = bucket_modes.get(shnum, "good")
464 peer = FakeBucketReaderWriterProxy(mode)
465 shareholders[shnum] = peer
466 e.set_shareholders(shareholders)
468 d.addCallback(_ready)
470 d1 = u.get_encryption_key()
471 d1.addCallback(lambda key: (res, key, shareholders))
476 def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
477 recover_mode, target=None):
478 (uri_extension_hash, required_shares, num_shares, file_size) = res
480 if "corrupt_key" in recover_mode:
481 # we corrupt the key, so that the decrypted data is corrupted and
482 # will fail the plaintext hash check. Since we're manually
483 # attaching shareholders, the fact that the storage index is also
484 # corrupted doesn't matter.
487 u = uri.CHKFileURI(key=key,
488 uri_extension_hash=uri_extension_hash,
489 needed_shares=required_shares,
490 total_shares=num_shares,
493 client = FakeClient()
495 target = download.Data()
496 fd = download.FileDownloader(client, u, target)
498 # we manually cycle the FileDownloader through a number of steps that
499 # would normally be sequenced by a Deferred chain in
500 # FileDownloader.start(), to give us more control over the process.
501 # In particular, by bypassing _get_all_shareholders, we skip
502 # permuted-peerlist selection.
503 for shnum, bucket in shareholders.items():
504 if shnum < AVAILABLE_SHARES and bucket.closed:
505 fd.add_share_bucket(shnum, bucket)
506 fd._got_all_shareholders(None)
508 # Make it possible to obtain uri_extension from the shareholders.
509 # Arrange for shareholders[0] to be the first, so we can selectively
510 # corrupt the data it returns.
511 uri_extension_sources = shareholders.values()
512 uri_extension_sources.remove(shareholders[0])
513 uri_extension_sources.insert(0, shareholders[0])
515 d = defer.succeed(None)
517 # have the FileDownloader retrieve a copy of uri_extension itself
518 d.addCallback(fd._obtain_uri_extension)
520 if "corrupt_crypttext_hashes" in recover_mode:
521 # replace everybody's crypttext hash trees with a different one
522 # (computed over a different file), then modify our uri_extension
523 # to reflect the new crypttext hash tree root
524 def _corrupt_crypttext_hashes(unused):
525 assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup
526 assert fd._vup.crypttext_root_hash, fd._vup
527 badhash = hashutil.tagged_hash("bogus", "data")
528 bad_crypttext_hashes = [badhash] * fd._vup.num_segments
529 badtree = hashtree.HashTree(bad_crypttext_hashes)
530 for bucket in shareholders.values():
531 bucket.crypttext_hashes = list(badtree)
532 fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments)
533 fd._crypttext_hash_tree.set_hashes({0: badtree[0]})
535 d.addCallback(_corrupt_crypttext_hashes)
537 # also have the FileDownloader ask for hash trees
538 d.addCallback(fd._get_crypttext_hash_tree)
540 d.addCallback(fd._download_all_segments)
541 d.addCallback(fd._done)
547 def test_not_enough_shares(self):
548 d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
550 self.failUnless(isinstance(res, Failure))
551 self.failUnless(res.check(NotEnoughSharesError))
555 def test_one_share_per_peer(self):
556 return self.send_and_recover()
559 return self.send_and_recover(datalen=74)
561 return self.send_and_recover(datalen=75)
563 return self.send_and_recover(datalen=51)
566 return self.send_and_recover(datalen=99)
568 return self.send_and_recover(datalen=100)
570 return self.send_and_recover(datalen=76)
573 return self.send_and_recover(datalen=124)
575 return self.send_and_recover(datalen=125)
577 return self.send_and_recover(datalen=101)
579 def test_pause(self):
580 # use a DownloadTarget that does pauseProducing/resumeProducing a few
581 # times, then finishes
583 d = self.send_and_recover(target=t)
586 def test_pause_then_stop(self):
587 # use a DownloadTarget that pauses, then stops.
588 t = PausingAndStoppingTarget()
589 d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop",
590 "our Consumer called stopProducing()",
591 self.send_and_recover, target=t)
595 # use a DownloadTarget that does an immediate stop (ticket #473)
597 d = self.shouldFail(download.DownloadStopped, "test_stop",
598 "our Consumer called stopProducing()",
599 self.send_and_recover, target=t)
602 # the following tests all use 4-out-of-10 encoding
604 def test_bad_blocks(self):
605 # the first 6 servers have bad blocks, which will be caught by the
607 modemap = dict([(i, "bad block")
610 for i in range(6, 10)])
611 return self.send_and_recover((4,8,10), bucket_modes=modemap)
613 def test_bad_blocks_failure(self):
614 # the first 7 servers have bad blocks, which will be caught by the
615 # blockhashes, and the download will fail
616 modemap = dict([(i, "bad block")
619 for i in range(7, 10)])
620 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
622 self.failUnless(isinstance(res, Failure), res)
623 self.failUnless(res.check(NotEnoughSharesError), res)
627 def test_bad_blockhashes(self):
628 # the first 6 servers have bad block hashes, so the blockhash tree
630 modemap = dict([(i, "bad blockhash")
633 for i in range(6, 10)])
634 return self.send_and_recover((4,8,10), bucket_modes=modemap)
636 def test_bad_blockhashes_failure(self):
637 # the first 7 servers have bad block hashes, so the blockhash tree
638 # will not validate, and the download will fail
639 modemap = dict([(i, "bad blockhash")
642 for i in range(7, 10)])
643 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
645 self.failUnless(isinstance(res, Failure))
646 self.failUnless(res.check(NotEnoughSharesError), res)
650 def test_bad_sharehashes(self):
651 # the first 6 servers have bad block hashes, so the sharehash tree
653 modemap = dict([(i, "bad sharehash")
656 for i in range(6, 10)])
657 return self.send_and_recover((4,8,10), bucket_modes=modemap)
659 def assertFetchFailureIn(self, fd, where):
660 expected = {"uri_extension": 0,
661 "crypttext_hash_tree": 0,
663 if where is not None:
665 self.failUnlessEqual(fd._fetch_failures, expected)
668 # just to make sure the test harness works when we aren't
669 # intentionally causing failures
670 modemap = dict([(i, "good") for i in range(0, 10)])
671 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
672 d.addCallback(self.assertFetchFailureIn, None)
675 def test_bad_uri_extension(self):
676 # the first server has a bad uri_extension block, so we will fail
677 # over to a different server.
678 modemap = dict([(i, "bad uri_extension") for i in range(1)] +
679 [(i, "good") for i in range(1, 10)])
680 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
681 d.addCallback(self.assertFetchFailureIn, "uri_extension")
684 def test_bad_crypttext_hashroot(self):
685 # the first server has a bad crypttext hashroot, so we will fail
686 # over to a different server.
687 modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
688 [(i, "good") for i in range(1, 10)])
689 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
690 d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
693 def test_bad_crypttext_hashes(self):
694 # the first server has a bad crypttext hash block, so we will fail
695 # over to a different server.
696 modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
697 [(i, "good") for i in range(1, 10)])
698 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
699 d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
702 def test_bad_crypttext_hashes_failure(self):
703 # to test that the crypttext merkle tree is really being applied, we
704 # sneak into the download process and corrupt two things: we replace
705 # everybody's crypttext hashtree with a bad version (computed over
706 # bogus data), and we modify the supposedly-validated uri_extension
707 # block to match the new crypttext hashtree root. The download
708 # process should notice that the crypttext coming out of FEC doesn't
709 # match the tree, and fail.
711 modemap = dict([(i, "good") for i in range(0, 10)])
712 d = self.send_and_recover((4,8,10), bucket_modes=modemap,
713 recover_mode=("corrupt_crypttext_hashes"))
715 self.failUnless(isinstance(res, Failure))
716 self.failUnless(res.check(hashtree.BadHashError), res)
720 def OFF_test_bad_plaintext(self):
721 # faking a decryption failure is easier: just corrupt the key
722 modemap = dict([(i, "good") for i in range(0, 10)])
723 d = self.send_and_recover((4,8,10), bucket_modes=modemap,
724 recover_mode=("corrupt_key"))
726 self.failUnless(isinstance(res, Failure))
727 self.failUnless(res.check(hashtree.BadHashError), res)
731 def test_bad_sharehashes_failure(self):
732 # all ten servers have bad share hashes, so the sharehash tree
733 # will not validate, and the download will fail
734 modemap = dict([(i, "bad sharehash")
736 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
738 self.failUnless(isinstance(res, Failure))
739 self.failUnless(res.check(NotEnoughSharesError))
743 def test_missing_sharehashes(self):
744 # the first 6 servers are missing their sharehashes, so the
745 # sharehash tree will not validate
746 modemap = dict([(i, "missing sharehash")
749 for i in range(6, 10)])
750 return self.send_and_recover((4,8,10), bucket_modes=modemap)
752 def test_missing_sharehashes_failure(self):
753 # all servers are missing their sharehashes, so the sharehash tree will not validate,
754 # and the download will fail
755 modemap = dict([(i, "missing sharehash")
757 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
759 self.failUnless(isinstance(res, Failure), res)
760 self.failUnless(res.check(NotEnoughSharesError), res)
764 def test_lost_one_shareholder(self):
765 # we have enough shareholders when we start, but one segment in we
766 # lose one of them. The upload should still succeed, as long as we
767 # still have 'shares_of_happiness' peers left.
768 modemap = dict([(i, "good") for i in range(9)] +
769 [(i, "lost") for i in range(9, 10)])
770 return self.send_and_recover((4,8,10), bucket_modes=modemap)
772 def test_lost_one_shareholder_early(self):
773 # we have enough shareholders when we choose peers, but just before
774 # we send the 'start' message, we lose one of them. The upload should
775 # still succeed, as long as we still have 'shares_of_happiness' peers
777 modemap = dict([(i, "good") for i in range(9)] +
778 [(i, "lost-early") for i in range(9, 10)])
779 return self.send_and_recover((4,8,10), bucket_modes=modemap)
781 def test_lost_many_shareholders(self):
782 # we have enough shareholders when we start, but one segment in we
783 # lose all but one of them. The upload should fail.
784 modemap = dict([(i, "good") for i in range(1)] +
785 [(i, "lost") for i in range(1, 10)])
786 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
788 self.failUnless(isinstance(res, Failure))
789 self.failUnless(res.check(NotEnoughSharesError), res)
793 def test_lost_all_shareholders(self):
794 # we have enough shareholders when we start, but one segment in we
795 # lose all of them. The upload should fail.
796 modemap = dict([(i, "lost") for i in range(10)])
797 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
799 self.failUnless(isinstance(res, Failure))
800 self.failUnless(res.check(NotEnoughSharesError))