1 from zope.interface import implements
2 from twisted.trial import unittest
3 from twisted.internet import defer, reactor
4 from twisted.python.failure import Failure
5 from foolscap.api import fireEventually
6 from allmydata import hashtree, uri
7 from allmydata.immutable import encode, upload, download
8 from allmydata.util import hashutil
9 from allmydata.util.assertutil import _assert
10 from allmydata.util.consumer import MemoryConsumer
11 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
12 NotEnoughSharesError, IStorageBroker, UploadUnhappinessError
13 from allmydata.monitor import Monitor
14 import common_util as testutil
16 class LostPeerError(Exception):
19 def flip_bit(good): # flips the last bit
20 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
22 class FakeStorageBroker:
23 implements(IStorageBroker)
25 class FakeBucketReaderWriterProxy:
26 implements(IStorageBucketWriter, IStorageBucketReader)
27 # these are used for both reading and writing
28 def __init__(self, mode="good", peerid="peer"):
31 self.plaintext_hashes = []
32 self.crypttext_hashes = []
33 self.block_hashes = None
34 self.share_hashes = None
42 if self.mode == "lost-early":
43 f = Failure(LostPeerError("I went away early"))
44 return fireEventually(f)
45 return defer.succeed(self)
50 def put_block(self, segmentnum, data):
51 if self.mode == "lost-early":
52 f = Failure(LostPeerError("I went away early"))
53 return fireEventually(f)
55 assert not self.closed
56 assert segmentnum not in self.blocks
57 if self.mode == "lost" and segmentnum >= 1:
58 raise LostPeerError("I'm going away now")
59 self.blocks[segmentnum] = data
60 return defer.maybeDeferred(_try)
62 def put_plaintext_hashes(self, hashes):
64 assert not self.closed
65 assert not self.plaintext_hashes
66 self.plaintext_hashes = hashes
67 return defer.maybeDeferred(_try)
69 def put_crypttext_hashes(self, hashes):
71 assert not self.closed
72 assert not self.crypttext_hashes
73 self.crypttext_hashes = hashes
74 return defer.maybeDeferred(_try)
76 def put_block_hashes(self, blockhashes):
78 assert not self.closed
79 assert self.block_hashes is None
80 self.block_hashes = blockhashes
81 return defer.maybeDeferred(_try)
83 def put_share_hashes(self, sharehashes):
85 assert not self.closed
86 assert self.share_hashes is None
87 self.share_hashes = sharehashes
88 return defer.maybeDeferred(_try)
90 def put_uri_extension(self, uri_extension):
92 assert not self.closed
93 self.uri_extension = uri_extension
94 return defer.maybeDeferred(_try)
98 assert not self.closed
100 return defer.maybeDeferred(_try)
103 return defer.succeed(None)
105 def get_block_data(self, blocknum, blocksize, size):
107 def _try(unused=None):
108 assert isinstance(blocknum, (int, long))
109 if self.mode == "bad block":
110 return flip_bit(self.blocks[blocknum])
111 return self.blocks[blocknum]
115 def get_plaintext_hashes(self):
117 def _try(unused=None):
118 hashes = self.plaintext_hashes[:]
123 def get_crypttext_hashes(self):
125 def _try(unused=None):
126 hashes = self.crypttext_hashes[:]
127 if self.mode == "bad crypttext hashroot":
128 hashes[0] = flip_bit(hashes[0])
129 if self.mode == "bad crypttext hash":
130 hashes[1] = flip_bit(hashes[1])
135 def get_block_hashes(self, at_least_these=()):
137 def _try(unused=None):
138 if self.mode == "bad blockhash":
139 hashes = self.block_hashes[:]
140 hashes[1] = flip_bit(hashes[1])
142 return self.block_hashes
146 def get_share_hashes(self, at_least_these=()):
148 def _try(unused=None):
149 if self.mode == "bad sharehash":
150 hashes = self.share_hashes[:]
151 hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
153 if self.mode == "missing sharehash":
154 # one sneaky attack would be to pretend we don't know our own
155 # sharehash, which could manage to frame someone else.
156 # download.py is supposed to guard against this case.
158 return self.share_hashes
162 def get_uri_extension(self):
164 def _try(unused=None):
165 if self.mode == "bad uri_extension":
166 return flip_bit(self.uri_extension)
167 return self.uri_extension
172 def make_data(length):
173 data = "happy happy joy joy" * 100
174 assert length <= len(data)
177 class ValidatedExtendedURIProxy(unittest.TestCase):
178 timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
187 _TMP += (K - (_TMP % K))
189 _TMP = SIZE / SEGSIZE
190 if SIZE % SEGSIZE != 0:
193 mindict = { 'segment_size': SEGSIZE,
194 'crypttext_root_hash': '0'*hashutil.CRYPTO_VAL_SIZE,
195 'share_root_hash': '1'*hashutil.CRYPTO_VAL_SIZE }
196 optional_consistent = { 'crypttext_hash': '2'*hashutil.CRYPTO_VAL_SIZE,
198 'codec_params': "%d-%d-%d" % (SEGSIZE, K, M),
199 'tail_codec_params': "%d-%d-%d" % (TAIL_SEGSIZE, K, M),
200 'num_segments': NUM_SEGMENTS,
204 'plaintext_hash': "anything",
205 'plaintext_root_hash': "anything", }
206 # optional_inconsistent = { 'crypttext_hash': ('2'*(hashutil.CRYPTO_VAL_SIZE-1), "", 77),
207 optional_inconsistent = { 'crypttext_hash': (77,),
208 'codec_name': ("digital fountain", ""),
209 'codec_params': ("%d-%d-%d" % (SEGSIZE, K-1, M),
210 "%d-%d-%d" % (SEGSIZE-1, K, M),
211 "%d-%d-%d" % (SEGSIZE, K, M-1)),
212 'tail_codec_params': ("%d-%d-%d" % (TAIL_SEGSIZE, K-1, M),
213 "%d-%d-%d" % (TAIL_SEGSIZE-1, K, M),
214 "%d-%d-%d" % (TAIL_SEGSIZE, K, M-1)),
215 'num_segments': (NUM_SEGMENTS-1,),
217 'needed_shares': (K-1,),
218 'total_shares': (M-1,), }
220 def _test(self, uebdict):
221 uebstring = uri.pack_extension(uebdict)
222 uebhash = hashutil.uri_extension_hash(uebstring)
223 fb = FakeBucketReaderWriterProxy()
224 fb.put_uri_extension(uebstring)
225 verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE)
226 vup = download.ValidatedExtendedURIProxy(fb, verifycap)
229 def _test_accept(self, uebdict):
230 return self._test(uebdict)
232 def _should_fail(self, res, expected_failures):
233 if isinstance(res, Failure):
234 res.trap(*expected_failures)
236 self.fail("was supposed to raise %s, not get '%s'" % (expected_failures, res))
238 def _test_reject(self, uebdict):
239 d = self._test(uebdict)
240 d.addBoth(self._should_fail, (KeyError, download.BadURIExtension))
243 def test_accept_minimal(self):
244 return self._test_accept(self.mindict)
246 def test_reject_insufficient(self):
248 for k in self.mindict.iterkeys():
249 insuffdict = self.mindict.copy()
251 d = self._test_reject(insuffdict)
253 return defer.DeferredList(dl)
255 def test_accept_optional(self):
257 for k in self.optional_consistent.iterkeys():
258 mydict = self.mindict.copy()
259 mydict[k] = self.optional_consistent[k]
260 d = self._test_accept(mydict)
262 return defer.DeferredList(dl)
264 def test_reject_optional(self):
266 for k in self.optional_inconsistent.iterkeys():
267 for v in self.optional_inconsistent[k]:
268 mydict = self.mindict.copy()
270 d = self._test_reject(mydict)
272 return defer.DeferredList(dl)
274 class Encode(unittest.TestCase):
275 timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
277 def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
278 expected_block_hashes, expected_share_hashes):
279 data = make_data(datalen)
280 # force use of multiple segments
282 u = upload.Data(data, convergence="some convergence string")
283 u.max_segment_size = max_segment_size
284 u.encoding_param_k = 25
285 u.encoding_param_happy = 75
286 u.encoding_param_n = 100
287 eu = upload.EncryptAnUploadable(u)
288 d = e.set_encrypted_uploadable(eu)
290 all_shareholders = []
292 k,happy,n = e.get_param("share_counts")
293 _assert(n == NUM_SHARES) # else we'll be completely confused
294 numsegs = e.get_param("num_segments")
295 _assert(numsegs == NUM_SEGMENTS, numsegs, NUM_SEGMENTS)
296 segsize = e.get_param("segment_size")
297 _assert( (NUM_SEGMENTS-1)*segsize < len(data) <= NUM_SEGMENTS*segsize,
298 NUM_SEGMENTS, segsize,
299 (NUM_SEGMENTS-1)*segsize, len(data), NUM_SEGMENTS*segsize)
303 for shnum in range(NUM_SHARES):
304 peer = FakeBucketReaderWriterProxy()
305 shareholders[shnum] = peer
306 servermap.setdefault(shnum, set()).add(peer.get_peerid())
307 all_shareholders.append(peer)
308 e.set_shareholders(shareholders, servermap)
310 d.addCallback(_ready)
314 self.failUnless(isinstance(verifycap.uri_extension_hash, str))
315 self.failUnlessEqual(len(verifycap.uri_extension_hash), 32)
316 for i,peer in enumerate(all_shareholders):
317 self.failUnless(peer.closed)
318 self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
319 # each peer gets a full tree of block hashes. For 3 or 4
320 # segments, that's 7 hashes. For 5 segments it's 15 hashes.
321 self.failUnlessEqual(len(peer.block_hashes),
322 expected_block_hashes)
323 for h in peer.block_hashes:
324 self.failUnlessEqual(len(h), 32)
325 # each peer also gets their necessary chain of share hashes.
326 # For 100 shares (rounded up to 128 leaves), that's 8 hashes
327 self.failUnlessEqual(len(peer.share_hashes),
328 expected_share_hashes)
329 for (hashnum, h) in peer.share_hashes:
330 self.failUnless(isinstance(hashnum, int))
331 self.failUnlessEqual(len(h), 32)
332 d.addCallback(_check)
336 # a series of 3*3 tests to check out edge conditions. One axis is how the
337 # plaintext is divided into segments: kn+(-1,0,1). Another way to express
338 # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we
339 # might test 74 bytes, 75 bytes, and 76 bytes.
341 # on the other axis is how many leaves in the block hash tree we wind up
342 # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
343 # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
344 # segments, and 5 segments.
346 # that results in the following series of data lengths:
348 # 4 segs: 99, 100, 76
349 # 5 segs: 124, 125, 101
351 # all tests encode to 100 shares, which means the share hash tree will
352 # have 128 leaves, which means that buckets will be given an 8-long share
355 # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
356 # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
357 # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
358 # trees, which get 15 blockhashes.
360 def test_send_74(self):
361 # 3 segments (25, 25, 24)
362 return self.do_encode(25, 74, 100, 3, 7, 8)
363 def test_send_75(self):
364 # 3 segments (25, 25, 25)
365 return self.do_encode(25, 75, 100, 3, 7, 8)
366 def test_send_51(self):
367 # 3 segments (25, 25, 1)
368 return self.do_encode(25, 51, 100, 3, 7, 8)
370 def test_send_76(self):
371 # encode a 76 byte file (in 4 segments: 25,25,25,1) to 100 shares
372 return self.do_encode(25, 76, 100, 4, 7, 8)
373 def test_send_99(self):
374 # 4 segments: 25,25,25,24
375 return self.do_encode(25, 99, 100, 4, 7, 8)
376 def test_send_100(self):
377 # 4 segments: 25,25,25,25
378 return self.do_encode(25, 100, 100, 4, 7, 8)
380 def test_send_124(self):
381 # 5 segments: 25, 25, 25, 25, 24
382 return self.do_encode(25, 124, 100, 5, 15, 8)
383 def test_send_125(self):
384 # 5 segments: 25, 25, 25, 25, 25
385 return self.do_encode(25, 125, 100, 5, 15, 8)
386 def test_send_101(self):
387 # 5 segments: 25, 25, 25, 25, 1
388 return self.do_encode(25, 101, 100, 5, 15, 8)
390 class PausingConsumer(MemoryConsumer):
392 MemoryConsumer.__init__(self)
395 def write(self, data):
396 self.size += len(data)
399 # we happen to use 4 segments, and want to avoid pausing on the
400 # last one (since then the _unpause timer will still be running)
401 self.producer.pauseProducing()
402 reactor.callLater(0.1, self._unpause)
403 return MemoryConsumer.write(self, data)
405 self.producer.resumeProducing()
407 class PausingAndStoppingConsumer(PausingConsumer):
408 def write(self, data):
409 self.producer.pauseProducing()
410 reactor.callLater(0.5, self._stop)
412 self.producer.stopProducing()
414 class StoppingConsumer(PausingConsumer):
415 def write(self, data):
416 self.producer.stopProducing()
418 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
419 timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
420 def send_and_recover(self, k_and_happy_and_n=(25,75,100),
421 AVAILABLE_SHARES=None,
425 recover_mode="recover",
428 if AVAILABLE_SHARES is None:
429 AVAILABLE_SHARES = k_and_happy_and_n[2]
430 data = make_data(datalen)
431 d = self.send(k_and_happy_and_n, AVAILABLE_SHARES,
432 max_segment_size, bucket_modes, data)
433 # that fires with (uri_extension_hash, e, shareholders)
434 d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode,
436 # that fires with newdata
437 def _downloaded((newdata, fd)):
438 self.failUnless(newdata == data, str((len(newdata), len(data))))
440 d.addCallback(_downloaded)
443 def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,
445 k, happy, n = k_and_happy_and_n
446 NUM_SHARES = k_and_happy_and_n[2]
447 if AVAILABLE_SHARES is None:
448 AVAILABLE_SHARES = NUM_SHARES
450 u = upload.Data(data, convergence="some convergence string")
451 # force use of multiple segments by using a low max_segment_size
452 u.max_segment_size = max_segment_size
453 u.encoding_param_k = k
454 u.encoding_param_happy = happy
455 u.encoding_param_n = n
456 eu = upload.EncryptAnUploadable(u)
457 d = e.set_encrypted_uploadable(eu)
461 k,happy,n = e.get_param("share_counts")
462 assert n == NUM_SHARES # else we'll be completely confused
464 for shnum in range(NUM_SHARES):
465 mode = bucket_modes.get(shnum, "good")
466 peer = FakeBucketReaderWriterProxy(mode, "peer%d" % shnum)
467 shareholders[shnum] = peer
468 servermap.setdefault(shnum, set()).add(peer.get_peerid())
469 e.set_shareholders(shareholders, servermap)
471 d.addCallback(_ready)
473 d1 = u.get_encryption_key()
474 d1.addCallback(lambda key: (res, key, shareholders))
479 def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
480 recover_mode, consumer=None):
483 if "corrupt_key" in recover_mode:
484 # we corrupt the key, so that the decrypted data is corrupted and
485 # will fail the plaintext hash check. Since we're manually
486 # attaching shareholders, the fact that the storage index is also
487 # corrupted doesn't matter.
490 u = uri.CHKFileURI(key=key,
491 uri_extension_hash=verifycap.uri_extension_hash,
492 needed_shares=verifycap.needed_shares,
493 total_shares=verifycap.total_shares,
496 sb = FakeStorageBroker()
498 consumer = MemoryConsumer()
499 innertarget = download.ConsumerAdapter(consumer)
500 target = download.DecryptingTarget(innertarget, u.key)
501 fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor())
503 # we manually cycle the CiphertextDownloader through a number of steps that
504 # would normally be sequenced by a Deferred chain in
505 # CiphertextDownloader.start(), to give us more control over the process.
506 # In particular, by bypassing _get_all_shareholders, we skip
507 # permuted-peerlist selection.
508 for shnum, bucket in shareholders.items():
509 if shnum < AVAILABLE_SHARES and bucket.closed:
510 fd.add_share_bucket(shnum, bucket)
511 fd._got_all_shareholders(None)
513 # Make it possible to obtain uri_extension from the shareholders.
514 # Arrange for shareholders[0] to be the first, so we can selectively
515 # corrupt the data it returns.
516 uri_extension_sources = shareholders.values()
517 uri_extension_sources.remove(shareholders[0])
518 uri_extension_sources.insert(0, shareholders[0])
520 d = defer.succeed(None)
522 # have the CiphertextDownloader retrieve a copy of uri_extension itself
523 d.addCallback(fd._obtain_uri_extension)
525 if "corrupt_crypttext_hashes" in recover_mode:
526 # replace everybody's crypttext hash trees with a different one
527 # (computed over a different file), then modify our uri_extension
528 # to reflect the new crypttext hash tree root
529 def _corrupt_crypttext_hashes(unused):
530 assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup
531 assert fd._vup.crypttext_root_hash, fd._vup
532 badhash = hashutil.tagged_hash("bogus", "data")
533 bad_crypttext_hashes = [badhash] * fd._vup.num_segments
534 badtree = hashtree.HashTree(bad_crypttext_hashes)
535 for bucket in shareholders.values():
536 bucket.crypttext_hashes = list(badtree)
537 fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments)
538 fd._crypttext_hash_tree.set_hashes({0: badtree[0]})
540 d.addCallback(_corrupt_crypttext_hashes)
542 # also have the CiphertextDownloader ask for hash trees
543 d.addCallback(fd._get_crypttext_hash_tree)
545 d.addCallback(fd._download_all_segments)
546 d.addCallback(fd._done)
548 newdata = "".join(consumer.chunks)
553 def test_not_enough_shares(self):
554 d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
556 self.failUnless(isinstance(res, Failure))
557 self.failUnless(res.check(NotEnoughSharesError))
561 def test_one_share_per_peer(self):
562 return self.send_and_recover()
565 return self.send_and_recover(datalen=74)
567 return self.send_and_recover(datalen=75)
569 return self.send_and_recover(datalen=51)
572 return self.send_and_recover(datalen=99)
574 return self.send_and_recover(datalen=100)
576 return self.send_and_recover(datalen=76)
579 return self.send_and_recover(datalen=124)
581 return self.send_and_recover(datalen=125)
583 return self.send_and_recover(datalen=101)
585 def test_pause(self):
586 # use a download target that does pauseProducing/resumeProducing a
587 # few times, then finishes
588 c = PausingConsumer()
589 d = self.send_and_recover(consumer=c)
592 def test_pause_then_stop(self):
593 # use a download target that pauses, then stops.
594 c = PausingAndStoppingConsumer()
595 d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop",
596 "our Consumer called stopProducing()",
597 self.send_and_recover, consumer=c)
601 # use a download targetthat does an immediate stop (ticket #473)
602 c = StoppingConsumer()
603 d = self.shouldFail(download.DownloadStopped, "test_stop",
604 "our Consumer called stopProducing()",
605 self.send_and_recover, consumer=c)
608 # the following tests all use 4-out-of-10 encoding
610 def test_bad_blocks(self):
611 # the first 6 servers have bad blocks, which will be caught by the
613 modemap = dict([(i, "bad block")
616 for i in range(6, 10)])
617 return self.send_and_recover((4,8,10), bucket_modes=modemap)
619 def test_bad_blocks_failure(self):
620 # the first 7 servers have bad blocks, which will be caught by the
621 # blockhashes, and the download will fail
622 modemap = dict([(i, "bad block")
625 for i in range(7, 10)])
626 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
628 self.failUnless(isinstance(res, Failure), res)
629 self.failUnless(res.check(NotEnoughSharesError), res)
633 def test_bad_blockhashes(self):
634 # the first 6 servers have bad block hashes, so the blockhash tree
636 modemap = dict([(i, "bad blockhash")
639 for i in range(6, 10)])
640 return self.send_and_recover((4,8,10), bucket_modes=modemap)
642 def test_bad_blockhashes_failure(self):
643 # the first 7 servers have bad block hashes, so the blockhash tree
644 # will not validate, and the download will fail
645 modemap = dict([(i, "bad blockhash")
648 for i in range(7, 10)])
649 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
651 self.failUnless(isinstance(res, Failure))
652 self.failUnless(res.check(NotEnoughSharesError), res)
656 def test_bad_sharehashes(self):
657 # the first 6 servers have bad block hashes, so the sharehash tree
659 modemap = dict([(i, "bad sharehash")
662 for i in range(6, 10)])
663 return self.send_and_recover((4,8,10), bucket_modes=modemap)
665 def assertFetchFailureIn(self, fd, where):
666 expected = {"uri_extension": 0,
667 "crypttext_hash_tree": 0,
669 if where is not None:
671 self.failUnlessEqual(fd._fetch_failures, expected)
674 # just to make sure the test harness works when we aren't
675 # intentionally causing failures
676 modemap = dict([(i, "good") for i in range(0, 10)])
677 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
678 d.addCallback(self.assertFetchFailureIn, None)
681 def test_bad_uri_extension(self):
682 # the first server has a bad uri_extension block, so we will fail
683 # over to a different server.
684 modemap = dict([(i, "bad uri_extension") for i in range(1)] +
685 [(i, "good") for i in range(1, 10)])
686 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
687 d.addCallback(self.assertFetchFailureIn, "uri_extension")
690 def test_bad_crypttext_hashroot(self):
691 # the first server has a bad crypttext hashroot, so we will fail
692 # over to a different server.
693 modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
694 [(i, "good") for i in range(1, 10)])
695 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
696 d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
699 def test_bad_crypttext_hashes(self):
700 # the first server has a bad crypttext hash block, so we will fail
701 # over to a different server.
702 modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
703 [(i, "good") for i in range(1, 10)])
704 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
705 d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
708 def test_bad_crypttext_hashes_failure(self):
709 # to test that the crypttext merkle tree is really being applied, we
710 # sneak into the download process and corrupt two things: we replace
711 # everybody's crypttext hashtree with a bad version (computed over
712 # bogus data), and we modify the supposedly-validated uri_extension
713 # block to match the new crypttext hashtree root. The download
714 # process should notice that the crypttext coming out of FEC doesn't
715 # match the tree, and fail.
717 modemap = dict([(i, "good") for i in range(0, 10)])
718 d = self.send_and_recover((4,8,10), bucket_modes=modemap,
719 recover_mode=("corrupt_crypttext_hashes"))
721 self.failUnless(isinstance(res, Failure))
722 self.failUnless(res.check(hashtree.BadHashError), res)
726 def OFF_test_bad_plaintext(self):
727 # faking a decryption failure is easier: just corrupt the key
728 modemap = dict([(i, "good") for i in range(0, 10)])
729 d = self.send_and_recover((4,8,10), bucket_modes=modemap,
730 recover_mode=("corrupt_key"))
732 self.failUnless(isinstance(res, Failure))
733 self.failUnless(res.check(hashtree.BadHashError), res)
737 def test_bad_sharehashes_failure(self):
738 # all ten servers have bad share hashes, so the sharehash tree
739 # will not validate, and the download will fail
740 modemap = dict([(i, "bad sharehash")
742 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
744 self.failUnless(isinstance(res, Failure))
745 self.failUnless(res.check(NotEnoughSharesError))
749 def test_missing_sharehashes(self):
750 # the first 6 servers are missing their sharehashes, so the
751 # sharehash tree will not validate
752 modemap = dict([(i, "missing sharehash")
755 for i in range(6, 10)])
756 return self.send_and_recover((4,8,10), bucket_modes=modemap)
758 def test_missing_sharehashes_failure(self):
759 # all servers are missing their sharehashes, so the sharehash tree will not validate,
760 # and the download will fail
761 modemap = dict([(i, "missing sharehash")
763 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
765 self.failUnless(isinstance(res, Failure), res)
766 self.failUnless(res.check(NotEnoughSharesError), res)
770 def test_lost_one_shareholder(self):
771 # we have enough shareholders when we start, but one segment in we
772 # lose one of them. The upload should still succeed, as long as we
773 # still have 'servers_of_happiness' peers left.
774 modemap = dict([(i, "good") for i in range(9)] +
775 [(i, "lost") for i in range(9, 10)])
776 return self.send_and_recover((4,8,10), bucket_modes=modemap)
778 def test_lost_one_shareholder_early(self):
779 # we have enough shareholders when we choose peers, but just before
780 # we send the 'start' message, we lose one of them. The upload should
781 # still succeed, as long as we still have 'servers_of_happiness' peers
783 modemap = dict([(i, "good") for i in range(9)] +
784 [(i, "lost-early") for i in range(9, 10)])
785 return self.send_and_recover((4,8,10), bucket_modes=modemap)
787 def test_lost_many_shareholders(self):
788 # we have enough shareholders when we start, but one segment in we
789 # lose all but one of them. The upload should fail.
790 modemap = dict([(i, "good") for i in range(1)] +
791 [(i, "lost") for i in range(1, 10)])
792 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
794 self.failUnless(isinstance(res, Failure))
795 self.failUnless(res.check(UploadUnhappinessError), res)
799 def test_lost_all_shareholders(self):
800 # we have enough shareholders when we start, but one segment in we
801 # lose all of them. The upload should fail.
802 modemap = dict([(i, "lost") for i in range(10)])
803 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
805 self.failUnless(isinstance(res, Failure))
806 self.failUnless(res.check(UploadUnhappinessError))