1 from zope.interface import implements
2 from twisted.trial import unittest
3 from twisted.internet import defer, reactor
4 from twisted.python.failure import Failure
5 from foolscap.api import fireEventually
6 from allmydata import hashtree, uri
7 from allmydata.immutable import encode, upload, download
8 from allmydata.util import hashutil
9 from allmydata.util.assertutil import _assert
10 from allmydata.util.consumer import MemoryConsumer
11 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
12 NotEnoughSharesError, IStorageBroker, UploadUnhappinessError
13 from allmydata.monitor import Monitor
14 import common_util as testutil
16 class LostPeerError(Exception):
19 def flip_bit(good): # flips the last bit
20 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
22 class FakeStorageBroker:
23 implements(IStorageBroker)
25 class FakeBucketReaderWriterProxy:
26 implements(IStorageBucketWriter, IStorageBucketReader)
27 # these are used for both reading and writing
28 def __init__(self, mode="good"):
31 self.plaintext_hashes = []
32 self.crypttext_hashes = []
33 self.block_hashes = None
34 self.share_hashes = None
41 if self.mode == "lost-early":
42 f = Failure(LostPeerError("I went away early"))
43 return fireEventually(f)
44 return defer.succeed(self)
49 def put_block(self, segmentnum, data):
50 if self.mode == "lost-early":
51 f = Failure(LostPeerError("I went away early"))
52 return fireEventually(f)
54 assert not self.closed
55 assert segmentnum not in self.blocks
56 if self.mode == "lost" and segmentnum >= 1:
57 raise LostPeerError("I'm going away now")
58 self.blocks[segmentnum] = data
59 return defer.maybeDeferred(_try)
61 def put_plaintext_hashes(self, hashes):
63 assert not self.closed
64 assert not self.plaintext_hashes
65 self.plaintext_hashes = hashes
66 return defer.maybeDeferred(_try)
68 def put_crypttext_hashes(self, hashes):
70 assert not self.closed
71 assert not self.crypttext_hashes
72 self.crypttext_hashes = hashes
73 return defer.maybeDeferred(_try)
75 def put_block_hashes(self, blockhashes):
77 assert not self.closed
78 assert self.block_hashes is None
79 self.block_hashes = blockhashes
80 return defer.maybeDeferred(_try)
82 def put_share_hashes(self, sharehashes):
84 assert not self.closed
85 assert self.share_hashes is None
86 self.share_hashes = sharehashes
87 return defer.maybeDeferred(_try)
89 def put_uri_extension(self, uri_extension):
91 assert not self.closed
92 self.uri_extension = uri_extension
93 return defer.maybeDeferred(_try)
97 assert not self.closed
99 return defer.maybeDeferred(_try)
102 return defer.succeed(None)
104 def get_block_data(self, blocknum, blocksize, size):
106 def _try(unused=None):
107 assert isinstance(blocknum, (int, long))
108 if self.mode == "bad block":
109 return flip_bit(self.blocks[blocknum])
110 return self.blocks[blocknum]
114 def get_plaintext_hashes(self):
116 def _try(unused=None):
117 hashes = self.plaintext_hashes[:]
122 def get_crypttext_hashes(self):
124 def _try(unused=None):
125 hashes = self.crypttext_hashes[:]
126 if self.mode == "bad crypttext hashroot":
127 hashes[0] = flip_bit(hashes[0])
128 if self.mode == "bad crypttext hash":
129 hashes[1] = flip_bit(hashes[1])
134 def get_block_hashes(self, at_least_these=()):
136 def _try(unused=None):
137 if self.mode == "bad blockhash":
138 hashes = self.block_hashes[:]
139 hashes[1] = flip_bit(hashes[1])
141 return self.block_hashes
145 def get_share_hashes(self, at_least_these=()):
147 def _try(unused=None):
148 if self.mode == "bad sharehash":
149 hashes = self.share_hashes[:]
150 hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
152 if self.mode == "missing sharehash":
153 # one sneaky attack would be to pretend we don't know our own
154 # sharehash, which could manage to frame someone else.
155 # download.py is supposed to guard against this case.
157 return self.share_hashes
161 def get_uri_extension(self):
163 def _try(unused=None):
164 if self.mode == "bad uri_extension":
165 return flip_bit(self.uri_extension)
166 return self.uri_extension
171 def make_data(length):
172 data = "happy happy joy joy" * 100
173 assert length <= len(data)
176 class ValidatedExtendedURIProxy(unittest.TestCase):
177 timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
186 _TMP += (K - (_TMP % K))
188 _TMP = SIZE / SEGSIZE
189 if SIZE % SEGSIZE != 0:
192 mindict = { 'segment_size': SEGSIZE,
193 'crypttext_root_hash': '0'*hashutil.CRYPTO_VAL_SIZE,
194 'share_root_hash': '1'*hashutil.CRYPTO_VAL_SIZE }
195 optional_consistent = { 'crypttext_hash': '2'*hashutil.CRYPTO_VAL_SIZE,
197 'codec_params': "%d-%d-%d" % (SEGSIZE, K, M),
198 'tail_codec_params': "%d-%d-%d" % (TAIL_SEGSIZE, K, M),
199 'num_segments': NUM_SEGMENTS,
203 'plaintext_hash': "anything",
204 'plaintext_root_hash': "anything", }
205 # optional_inconsistent = { 'crypttext_hash': ('2'*(hashutil.CRYPTO_VAL_SIZE-1), "", 77),
206 optional_inconsistent = { 'crypttext_hash': (77,),
207 'codec_name': ("digital fountain", ""),
208 'codec_params': ("%d-%d-%d" % (SEGSIZE, K-1, M),
209 "%d-%d-%d" % (SEGSIZE-1, K, M),
210 "%d-%d-%d" % (SEGSIZE, K, M-1)),
211 'tail_codec_params': ("%d-%d-%d" % (TAIL_SEGSIZE, K-1, M),
212 "%d-%d-%d" % (TAIL_SEGSIZE-1, K, M),
213 "%d-%d-%d" % (TAIL_SEGSIZE, K, M-1)),
214 'num_segments': (NUM_SEGMENTS-1,),
216 'needed_shares': (K-1,),
217 'total_shares': (M-1,), }
219 def _test(self, uebdict):
220 uebstring = uri.pack_extension(uebdict)
221 uebhash = hashutil.uri_extension_hash(uebstring)
222 fb = FakeBucketReaderWriterProxy()
223 fb.put_uri_extension(uebstring)
224 verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE)
225 vup = download.ValidatedExtendedURIProxy(fb, verifycap)
228 def _test_accept(self, uebdict):
229 return self._test(uebdict)
231 def _should_fail(self, res, expected_failures):
232 if isinstance(res, Failure):
233 res.trap(*expected_failures)
235 self.fail("was supposed to raise %s, not get '%s'" % (expected_failures, res))
237 def _test_reject(self, uebdict):
238 d = self._test(uebdict)
239 d.addBoth(self._should_fail, (KeyError, download.BadURIExtension))
242 def test_accept_minimal(self):
243 return self._test_accept(self.mindict)
245 def test_reject_insufficient(self):
247 for k in self.mindict.iterkeys():
248 insuffdict = self.mindict.copy()
250 d = self._test_reject(insuffdict)
252 return defer.DeferredList(dl)
254 def test_accept_optional(self):
256 for k in self.optional_consistent.iterkeys():
257 mydict = self.mindict.copy()
258 mydict[k] = self.optional_consistent[k]
259 d = self._test_accept(mydict)
261 return defer.DeferredList(dl)
263 def test_reject_optional(self):
265 for k in self.optional_inconsistent.iterkeys():
266 for v in self.optional_inconsistent[k]:
267 mydict = self.mindict.copy()
269 d = self._test_reject(mydict)
271 return defer.DeferredList(dl)
273 class Encode(unittest.TestCase):
274 timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
276 def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
277 expected_block_hashes, expected_share_hashes):
278 data = make_data(datalen)
279 # force use of multiple segments
281 u = upload.Data(data, convergence="some convergence string")
282 u.max_segment_size = max_segment_size
283 u.encoding_param_k = 25
284 u.encoding_param_happy = 75
285 u.encoding_param_n = 100
286 eu = upload.EncryptAnUploadable(u)
287 d = e.set_encrypted_uploadable(eu)
289 all_shareholders = []
291 k,happy,n = e.get_param("share_counts")
292 _assert(n == NUM_SHARES) # else we'll be completely confused
293 numsegs = e.get_param("num_segments")
294 _assert(numsegs == NUM_SEGMENTS, numsegs, NUM_SEGMENTS)
295 segsize = e.get_param("segment_size")
296 _assert( (NUM_SEGMENTS-1)*segsize < len(data) <= NUM_SEGMENTS*segsize,
297 NUM_SEGMENTS, segsize,
298 (NUM_SEGMENTS-1)*segsize, len(data), NUM_SEGMENTS*segsize)
302 for shnum in range(NUM_SHARES):
303 peer = FakeBucketReaderWriterProxy()
304 shareholders[shnum] = peer
305 servermap[shnum] = str(shnum)
306 all_shareholders.append(peer)
307 e.set_shareholders(shareholders, servermap)
309 d.addCallback(_ready)
313 self.failUnless(isinstance(verifycap.uri_extension_hash, str))
314 self.failUnlessEqual(len(verifycap.uri_extension_hash), 32)
315 for i,peer in enumerate(all_shareholders):
316 self.failUnless(peer.closed)
317 self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
318 # each peer gets a full tree of block hashes. For 3 or 4
319 # segments, that's 7 hashes. For 5 segments it's 15 hashes.
320 self.failUnlessEqual(len(peer.block_hashes),
321 expected_block_hashes)
322 for h in peer.block_hashes:
323 self.failUnlessEqual(len(h), 32)
324 # each peer also gets their necessary chain of share hashes.
325 # For 100 shares (rounded up to 128 leaves), that's 8 hashes
326 self.failUnlessEqual(len(peer.share_hashes),
327 expected_share_hashes)
328 for (hashnum, h) in peer.share_hashes:
329 self.failUnless(isinstance(hashnum, int))
330 self.failUnlessEqual(len(h), 32)
331 d.addCallback(_check)
335 # a series of 3*3 tests to check out edge conditions. One axis is how the
336 # plaintext is divided into segments: kn+(-1,0,1). Another way to express
337 # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we
338 # might test 74 bytes, 75 bytes, and 76 bytes.
340 # on the other axis is how many leaves in the block hash tree we wind up
341 # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
342 # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
343 # segments, and 5 segments.
345 # that results in the following series of data lengths:
347 # 4 segs: 99, 100, 76
348 # 5 segs: 124, 125, 101
350 # all tests encode to 100 shares, which means the share hash tree will
351 # have 128 leaves, which means that buckets will be given an 8-long share
354 # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
355 # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
356 # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
357 # trees, which get 15 blockhashes.
359 def test_send_74(self):
360 # 3 segments (25, 25, 24)
361 return self.do_encode(25, 74, 100, 3, 7, 8)
362 def test_send_75(self):
363 # 3 segments (25, 25, 25)
364 return self.do_encode(25, 75, 100, 3, 7, 8)
365 def test_send_51(self):
366 # 3 segments (25, 25, 1)
367 return self.do_encode(25, 51, 100, 3, 7, 8)
369 def test_send_76(self):
370 # encode a 76 byte file (in 4 segments: 25,25,25,1) to 100 shares
371 return self.do_encode(25, 76, 100, 4, 7, 8)
372 def test_send_99(self):
373 # 4 segments: 25,25,25,24
374 return self.do_encode(25, 99, 100, 4, 7, 8)
375 def test_send_100(self):
376 # 4 segments: 25,25,25,25
377 return self.do_encode(25, 100, 100, 4, 7, 8)
379 def test_send_124(self):
380 # 5 segments: 25, 25, 25, 25, 24
381 return self.do_encode(25, 124, 100, 5, 15, 8)
382 def test_send_125(self):
383 # 5 segments: 25, 25, 25, 25, 25
384 return self.do_encode(25, 125, 100, 5, 15, 8)
385 def test_send_101(self):
386 # 5 segments: 25, 25, 25, 25, 1
387 return self.do_encode(25, 101, 100, 5, 15, 8)
389 class PausingConsumer(MemoryConsumer):
391 MemoryConsumer.__init__(self)
394 def write(self, data):
395 self.size += len(data)
398 # we happen to use 4 segments, and want to avoid pausing on the
399 # last one (since then the _unpause timer will still be running)
400 self.producer.pauseProducing()
401 reactor.callLater(0.1, self._unpause)
402 return MemoryConsumer.write(self, data)
404 self.producer.resumeProducing()
406 class PausingAndStoppingConsumer(PausingConsumer):
407 def write(self, data):
408 self.producer.pauseProducing()
409 reactor.callLater(0.5, self._stop)
411 self.producer.stopProducing()
413 class StoppingConsumer(PausingConsumer):
414 def write(self, data):
415 self.producer.stopProducing()
417 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
418 timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
419 def send_and_recover(self, k_and_happy_and_n=(25,75,100),
420 AVAILABLE_SHARES=None,
424 recover_mode="recover",
427 if AVAILABLE_SHARES is None:
428 AVAILABLE_SHARES = k_and_happy_and_n[2]
429 data = make_data(datalen)
430 d = self.send(k_and_happy_and_n, AVAILABLE_SHARES,
431 max_segment_size, bucket_modes, data)
432 # that fires with (uri_extension_hash, e, shareholders)
433 d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode,
435 # that fires with newdata
436 def _downloaded((newdata, fd)):
437 self.failUnless(newdata == data, str((len(newdata), len(data))))
439 d.addCallback(_downloaded)
442 def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,
444 k, happy, n = k_and_happy_and_n
445 NUM_SHARES = k_and_happy_and_n[2]
446 if AVAILABLE_SHARES is None:
447 AVAILABLE_SHARES = NUM_SHARES
449 u = upload.Data(data, convergence="some convergence string")
450 # force use of multiple segments by using a low max_segment_size
451 u.max_segment_size = max_segment_size
452 u.encoding_param_k = k
453 u.encoding_param_happy = happy
454 u.encoding_param_n = n
455 eu = upload.EncryptAnUploadable(u)
456 d = e.set_encrypted_uploadable(eu)
460 k,happy,n = e.get_param("share_counts")
461 assert n == NUM_SHARES # else we'll be completely confused
463 for shnum in range(NUM_SHARES):
464 mode = bucket_modes.get(shnum, "good")
465 peer = FakeBucketReaderWriterProxy(mode)
466 shareholders[shnum] = peer
467 servermap[shnum] = str(shnum)
468 e.set_shareholders(shareholders, servermap)
470 d.addCallback(_ready)
472 d1 = u.get_encryption_key()
473 d1.addCallback(lambda key: (res, key, shareholders))
478 def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
479 recover_mode, consumer=None):
482 if "corrupt_key" in recover_mode:
483 # we corrupt the key, so that the decrypted data is corrupted and
484 # will fail the plaintext hash check. Since we're manually
485 # attaching shareholders, the fact that the storage index is also
486 # corrupted doesn't matter.
489 u = uri.CHKFileURI(key=key,
490 uri_extension_hash=verifycap.uri_extension_hash,
491 needed_shares=verifycap.needed_shares,
492 total_shares=verifycap.total_shares,
495 sb = FakeStorageBroker()
497 consumer = MemoryConsumer()
498 innertarget = download.ConsumerAdapter(consumer)
499 target = download.DecryptingTarget(innertarget, u.key)
500 fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor())
502 # we manually cycle the CiphertextDownloader through a number of steps that
503 # would normally be sequenced by a Deferred chain in
504 # CiphertextDownloader.start(), to give us more control over the process.
505 # In particular, by bypassing _get_all_shareholders, we skip
506 # permuted-peerlist selection.
507 for shnum, bucket in shareholders.items():
508 if shnum < AVAILABLE_SHARES and bucket.closed:
509 fd.add_share_bucket(shnum, bucket)
510 fd._got_all_shareholders(None)
512 # Make it possible to obtain uri_extension from the shareholders.
513 # Arrange for shareholders[0] to be the first, so we can selectively
514 # corrupt the data it returns.
515 uri_extension_sources = shareholders.values()
516 uri_extension_sources.remove(shareholders[0])
517 uri_extension_sources.insert(0, shareholders[0])
519 d = defer.succeed(None)
521 # have the CiphertextDownloader retrieve a copy of uri_extension itself
522 d.addCallback(fd._obtain_uri_extension)
524 if "corrupt_crypttext_hashes" in recover_mode:
525 # replace everybody's crypttext hash trees with a different one
526 # (computed over a different file), then modify our uri_extension
527 # to reflect the new crypttext hash tree root
528 def _corrupt_crypttext_hashes(unused):
529 assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup
530 assert fd._vup.crypttext_root_hash, fd._vup
531 badhash = hashutil.tagged_hash("bogus", "data")
532 bad_crypttext_hashes = [badhash] * fd._vup.num_segments
533 badtree = hashtree.HashTree(bad_crypttext_hashes)
534 for bucket in shareholders.values():
535 bucket.crypttext_hashes = list(badtree)
536 fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments)
537 fd._crypttext_hash_tree.set_hashes({0: badtree[0]})
539 d.addCallback(_corrupt_crypttext_hashes)
541 # also have the CiphertextDownloader ask for hash trees
542 d.addCallback(fd._get_crypttext_hash_tree)
544 d.addCallback(fd._download_all_segments)
545 d.addCallback(fd._done)
547 newdata = "".join(consumer.chunks)
552 def test_not_enough_shares(self):
553 d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
555 self.failUnless(isinstance(res, Failure))
556 self.failUnless(res.check(NotEnoughSharesError))
560 def test_one_share_per_peer(self):
561 return self.send_and_recover()
564 return self.send_and_recover(datalen=74)
566 return self.send_and_recover(datalen=75)
568 return self.send_and_recover(datalen=51)
571 return self.send_and_recover(datalen=99)
573 return self.send_and_recover(datalen=100)
575 return self.send_and_recover(datalen=76)
578 return self.send_and_recover(datalen=124)
580 return self.send_and_recover(datalen=125)
582 return self.send_and_recover(datalen=101)
584 def test_pause(self):
585 # use a download target that does pauseProducing/resumeProducing a
586 # few times, then finishes
587 c = PausingConsumer()
588 d = self.send_and_recover(consumer=c)
591 def test_pause_then_stop(self):
592 # use a download target that pauses, then stops.
593 c = PausingAndStoppingConsumer()
594 d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop",
595 "our Consumer called stopProducing()",
596 self.send_and_recover, consumer=c)
600 # use a download targetthat does an immediate stop (ticket #473)
601 c = StoppingConsumer()
602 d = self.shouldFail(download.DownloadStopped, "test_stop",
603 "our Consumer called stopProducing()",
604 self.send_and_recover, consumer=c)
607 # the following tests all use 4-out-of-10 encoding
609 def test_bad_blocks(self):
610 # the first 6 servers have bad blocks, which will be caught by the
612 modemap = dict([(i, "bad block")
615 for i in range(6, 10)])
616 return self.send_and_recover((4,8,10), bucket_modes=modemap)
618 def test_bad_blocks_failure(self):
619 # the first 7 servers have bad blocks, which will be caught by the
620 # blockhashes, and the download will fail
621 modemap = dict([(i, "bad block")
624 for i in range(7, 10)])
625 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
627 self.failUnless(isinstance(res, Failure), res)
628 self.failUnless(res.check(NotEnoughSharesError), res)
632 def test_bad_blockhashes(self):
633 # the first 6 servers have bad block hashes, so the blockhash tree
635 modemap = dict([(i, "bad blockhash")
638 for i in range(6, 10)])
639 return self.send_and_recover((4,8,10), bucket_modes=modemap)
641 def test_bad_blockhashes_failure(self):
642 # the first 7 servers have bad block hashes, so the blockhash tree
643 # will not validate, and the download will fail
644 modemap = dict([(i, "bad blockhash")
647 for i in range(7, 10)])
648 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
650 self.failUnless(isinstance(res, Failure))
651 self.failUnless(res.check(NotEnoughSharesError), res)
655 def test_bad_sharehashes(self):
656 # the first 6 servers have bad block hashes, so the sharehash tree
658 modemap = dict([(i, "bad sharehash")
661 for i in range(6, 10)])
662 return self.send_and_recover((4,8,10), bucket_modes=modemap)
664 def assertFetchFailureIn(self, fd, where):
665 expected = {"uri_extension": 0,
666 "crypttext_hash_tree": 0,
668 if where is not None:
670 self.failUnlessEqual(fd._fetch_failures, expected)
673 # just to make sure the test harness works when we aren't
674 # intentionally causing failures
675 modemap = dict([(i, "good") for i in range(0, 10)])
676 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
677 d.addCallback(self.assertFetchFailureIn, None)
680 def test_bad_uri_extension(self):
681 # the first server has a bad uri_extension block, so we will fail
682 # over to a different server.
683 modemap = dict([(i, "bad uri_extension") for i in range(1)] +
684 [(i, "good") for i in range(1, 10)])
685 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
686 d.addCallback(self.assertFetchFailureIn, "uri_extension")
689 def test_bad_crypttext_hashroot(self):
690 # the first server has a bad crypttext hashroot, so we will fail
691 # over to a different server.
692 modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
693 [(i, "good") for i in range(1, 10)])
694 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
695 d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
698 def test_bad_crypttext_hashes(self):
699 # the first server has a bad crypttext hash block, so we will fail
700 # over to a different server.
701 modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
702 [(i, "good") for i in range(1, 10)])
703 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
704 d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
707 def test_bad_crypttext_hashes_failure(self):
708 # to test that the crypttext merkle tree is really being applied, we
709 # sneak into the download process and corrupt two things: we replace
710 # everybody's crypttext hashtree with a bad version (computed over
711 # bogus data), and we modify the supposedly-validated uri_extension
712 # block to match the new crypttext hashtree root. The download
713 # process should notice that the crypttext coming out of FEC doesn't
714 # match the tree, and fail.
716 modemap = dict([(i, "good") for i in range(0, 10)])
717 d = self.send_and_recover((4,8,10), bucket_modes=modemap,
718 recover_mode=("corrupt_crypttext_hashes"))
720 self.failUnless(isinstance(res, Failure))
721 self.failUnless(res.check(hashtree.BadHashError), res)
725 def OFF_test_bad_plaintext(self):
726 # faking a decryption failure is easier: just corrupt the key
727 modemap = dict([(i, "good") for i in range(0, 10)])
728 d = self.send_and_recover((4,8,10), bucket_modes=modemap,
729 recover_mode=("corrupt_key"))
731 self.failUnless(isinstance(res, Failure))
732 self.failUnless(res.check(hashtree.BadHashError), res)
736 def test_bad_sharehashes_failure(self):
737 # all ten servers have bad share hashes, so the sharehash tree
738 # will not validate, and the download will fail
739 modemap = dict([(i, "bad sharehash")
741 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
743 self.failUnless(isinstance(res, Failure))
744 self.failUnless(res.check(NotEnoughSharesError))
748 def test_missing_sharehashes(self):
749 # the first 6 servers are missing their sharehashes, so the
750 # sharehash tree will not validate
751 modemap = dict([(i, "missing sharehash")
754 for i in range(6, 10)])
755 return self.send_and_recover((4,8,10), bucket_modes=modemap)
757 def test_missing_sharehashes_failure(self):
758 # all servers are missing their sharehashes, so the sharehash tree will not validate,
759 # and the download will fail
760 modemap = dict([(i, "missing sharehash")
762 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
764 self.failUnless(isinstance(res, Failure), res)
765 self.failUnless(res.check(NotEnoughSharesError), res)
769 def test_lost_one_shareholder(self):
770 # we have enough shareholders when we start, but one segment in we
771 # lose one of them. The upload should still succeed, as long as we
772 # still have 'shares_of_happiness' peers left.
773 modemap = dict([(i, "good") for i in range(9)] +
774 [(i, "lost") for i in range(9, 10)])
775 return self.send_and_recover((4,8,10), bucket_modes=modemap)
777 def test_lost_one_shareholder_early(self):
778 # we have enough shareholders when we choose peers, but just before
779 # we send the 'start' message, we lose one of them. The upload should
780 # still succeed, as long as we still have 'shares_of_happiness' peers
782 modemap = dict([(i, "good") for i in range(9)] +
783 [(i, "lost-early") for i in range(9, 10)])
784 return self.send_and_recover((4,8,10), bucket_modes=modemap)
786 def test_lost_many_shareholders(self):
787 # we have enough shareholders when we start, but one segment in we
788 # lose all but one of them. The upload should fail.
789 modemap = dict([(i, "good") for i in range(1)] +
790 [(i, "lost") for i in range(1, 10)])
791 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
793 self.failUnless(isinstance(res, Failure))
794 self.failUnless(res.check(UploadUnhappinessError), res)
798 def test_lost_all_shareholders(self):
799 # we have enough shareholders when we start, but one segment in we
800 # lose all of them. The upload should fail.
801 modemap = dict([(i, "lost") for i in range(10)])
802 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
804 self.failUnless(isinstance(res, Failure))
805 self.failUnless(res.check(UploadUnhappinessError))