2 from zope.interface import implements
3 from twisted.trial import unittest
4 from twisted.internet import defer
5 from twisted.python.failure import Failure
6 from foolscap import eventual
7 from allmydata import encode, upload, download, hashtree, uri
8 from allmydata.util import hashutil
9 from allmydata.util.assertutil import _assert
10 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader
12 class LostPeerError(Exception):
15 def flip_bit(good): # flips the last bit
16 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
19 def log(self, *args, **kwargs):
22 class FakeBucketWriterProxy:
23 implements(IStorageBucketWriter, IStorageBucketReader)
24 # these are used for both reading and writing
25 def __init__(self, mode="good"):
28 self.plaintext_hashes = []
29 self.crypttext_hashes = []
30 self.block_hashes = None
31 self.share_hashes = None
37 def startIfNecessary(self):
38 return defer.succeed(self)
40 if self.mode == "lost-early":
41 f = Failure(LostPeerError("I went away early"))
42 return eventual.fireEventually(f)
43 return defer.succeed(self)
45 def put_block(self, segmentnum, data):
46 if self.mode == "lost-early":
47 f = Failure(LostPeerError("I went away early"))
48 return eventual.fireEventually(f)
50 assert not self.closed
51 assert segmentnum not in self.blocks
52 if self.mode == "lost" and segmentnum >= 1:
53 raise LostPeerError("I'm going away now")
54 self.blocks[segmentnum] = data
55 return defer.maybeDeferred(_try)
57 def put_plaintext_hashes(self, hashes):
59 assert not self.closed
60 assert not self.plaintext_hashes
61 self.plaintext_hashes = hashes
62 return defer.maybeDeferred(_try)
64 def put_crypttext_hashes(self, hashes):
66 assert not self.closed
67 assert not self.crypttext_hashes
68 self.crypttext_hashes = hashes
69 return defer.maybeDeferred(_try)
71 def put_block_hashes(self, blockhashes):
73 assert not self.closed
74 assert self.block_hashes is None
75 self.block_hashes = blockhashes
76 return defer.maybeDeferred(_try)
78 def put_share_hashes(self, sharehashes):
80 assert not self.closed
81 assert self.share_hashes is None
82 self.share_hashes = sharehashes
83 return defer.maybeDeferred(_try)
85 def put_uri_extension(self, uri_extension):
87 assert not self.closed
88 self.uri_extension = uri_extension
89 return defer.maybeDeferred(_try)
93 assert not self.closed
95 return defer.maybeDeferred(_try)
98 return defer.succeed(None)
100 def get_block(self, blocknum):
102 assert isinstance(blocknum, (int, long))
103 if self.mode == "bad block":
104 return flip_bit(self.blocks[blocknum])
105 return self.blocks[blocknum]
106 return defer.maybeDeferred(_try)
108 def get_plaintext_hashes(self):
110 hashes = self.plaintext_hashes[:]
111 if self.mode == "bad plaintext hashroot":
112 hashes[0] = flip_bit(hashes[0])
113 if self.mode == "bad plaintext hash":
114 hashes[1] = flip_bit(hashes[1])
116 return defer.maybeDeferred(_try)
118 def get_crypttext_hashes(self):
120 hashes = self.crypttext_hashes[:]
121 if self.mode == "bad crypttext hashroot":
122 hashes[0] = flip_bit(hashes[0])
123 if self.mode == "bad crypttext hash":
124 hashes[1] = flip_bit(hashes[1])
126 return defer.maybeDeferred(_try)
128 def get_block_hashes(self):
130 if self.mode == "bad blockhash":
131 hashes = self.block_hashes[:]
132 hashes[1] = flip_bit(hashes[1])
134 return self.block_hashes
135 return defer.maybeDeferred(_try)
137 def get_share_hashes(self):
139 if self.mode == "bad sharehash":
140 hashes = self.share_hashes[:]
141 hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
143 if self.mode == "missing sharehash":
144 # one sneaky attack would be to pretend we don't know our own
145 # sharehash, which could manage to frame someone else.
146 # download.py is supposed to guard against this case.
148 return self.share_hashes
149 return defer.maybeDeferred(_try)
151 def get_uri_extension(self):
153 if self.mode == "bad uri_extension":
154 return flip_bit(self.uri_extension)
155 return self.uri_extension
156 return defer.maybeDeferred(_try)
159 def make_data(length):
160 data = "happy happy joy joy" * 100
161 assert length <= len(data)
164 class Encode(unittest.TestCase):
166 def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
167 expected_block_hashes, expected_share_hashes):
168 data = make_data(datalen)
169 # force use of multiple segments
171 u = upload.Data(data, convergence="some convergence string")
172 u.max_segment_size = max_segment_size
173 u.encoding_param_k = 25
174 u.encoding_param_happy = 75
175 u.encoding_param_n = 100
176 eu = upload.EncryptAnUploadable(u)
177 d = e.set_encrypted_uploadable(eu)
179 all_shareholders = []
181 k,happy,n = e.get_param("share_counts")
182 _assert(n == NUM_SHARES) # else we'll be completely confused
183 numsegs = e.get_param("num_segments")
184 _assert(numsegs == NUM_SEGMENTS, numsegs, NUM_SEGMENTS)
185 segsize = e.get_param("segment_size")
186 _assert( (NUM_SEGMENTS-1)*segsize < len(data) <= NUM_SEGMENTS*segsize,
187 NUM_SEGMENTS, segsize,
188 (NUM_SEGMENTS-1)*segsize, len(data), NUM_SEGMENTS*segsize)
191 for shnum in range(NUM_SHARES):
192 peer = FakeBucketWriterProxy()
193 shareholders[shnum] = peer
194 all_shareholders.append(peer)
195 e.set_shareholders(shareholders)
197 d.addCallback(_ready)
200 (uri_extension_hash, required_shares, num_shares, file_size) = res
201 self.failUnless(isinstance(uri_extension_hash, str))
202 self.failUnlessEqual(len(uri_extension_hash), 32)
203 for i,peer in enumerate(all_shareholders):
204 self.failUnless(peer.closed)
205 self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
206 # each peer gets a full tree of block hashes. For 3 or 4
207 # segments, that's 7 hashes. For 5 segments it's 15 hashes.
208 self.failUnlessEqual(len(peer.block_hashes),
209 expected_block_hashes)
210 for h in peer.block_hashes:
211 self.failUnlessEqual(len(h), 32)
212 # each peer also gets their necessary chain of share hashes.
213 # For 100 shares (rounded up to 128 leaves), that's 8 hashes
214 self.failUnlessEqual(len(peer.share_hashes),
215 expected_share_hashes)
216 for (hashnum, h) in peer.share_hashes:
217 self.failUnless(isinstance(hashnum, int))
218 self.failUnlessEqual(len(h), 32)
219 d.addCallback(_check)
223 # a series of 3*3 tests to check out edge conditions. One axis is how the
224 # plaintext is divided into segments: kn+(-1,0,1). Another way to express
225 # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we
226 # might test 74 bytes, 75 bytes, and 76 bytes.
228 # on the other axis is how many leaves in the block hash tree we wind up
229 # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
230 # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
231 # segments, and 5 segments.
233 # that results in the following series of data lengths:
235 # 4 segs: 99, 100, 76
236 # 5 segs: 124, 125, 101
238 # all tests encode to 100 shares, which means the share hash tree will
239 # have 128 leaves, which means that buckets will be given an 8-long share
242 # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
243 # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
244 # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
245 # trees, which get 15 blockhashes.
247 def test_send_74(self):
248 # 3 segments (25, 25, 24)
249 return self.do_encode(25, 74, 100, 3, 7, 8)
250 def test_send_75(self):
251 # 3 segments (25, 25, 25)
252 return self.do_encode(25, 75, 100, 3, 7, 8)
253 def test_send_51(self):
254 # 3 segments (25, 25, 1)
255 return self.do_encode(25, 51, 100, 3, 7, 8)
257 def test_send_76(self):
258 # encode a 76 byte file (in 4 segments: 25,25,25,1) to 100 shares
259 return self.do_encode(25, 76, 100, 4, 7, 8)
260 def test_send_99(self):
261 # 4 segments: 25,25,25,24
262 return self.do_encode(25, 99, 100, 4, 7, 8)
263 def test_send_100(self):
264 # 4 segments: 25,25,25,25
265 return self.do_encode(25, 100, 100, 4, 7, 8)
267 def test_send_124(self):
268 # 5 segments: 25, 25, 25, 25, 24
269 return self.do_encode(25, 124, 100, 5, 15, 8)
270 def test_send_125(self):
271 # 5 segments: 25, 25, 25, 25, 25
272 return self.do_encode(25, 125, 100, 5, 15, 8)
273 def test_send_101(self):
274 # 5 segments: 25, 25, 25, 25, 1
275 return self.do_encode(25, 101, 100, 5, 15, 8)
277 class Roundtrip(unittest.TestCase):
278 def send_and_recover(self, k_and_happy_and_n=(25,75,100),
279 AVAILABLE_SHARES=None,
283 recover_mode="recover",
285 if AVAILABLE_SHARES is None:
286 AVAILABLE_SHARES = k_and_happy_and_n[2]
287 data = make_data(datalen)
288 d = self.send(k_and_happy_and_n, AVAILABLE_SHARES,
289 max_segment_size, bucket_modes, data)
290 # that fires with (uri_extension_hash, e, shareholders)
291 d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode)
292 # that fires with newdata
293 def _downloaded((newdata, fd)):
294 self.failUnless(newdata == data)
296 d.addCallback(_downloaded)
299 def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,
301 k, happy, n = k_and_happy_and_n
302 NUM_SHARES = k_and_happy_and_n[2]
303 if AVAILABLE_SHARES is None:
304 AVAILABLE_SHARES = NUM_SHARES
306 u = upload.Data(data, convergence="some convergence string")
307 # force use of multiple segments by using a low max_segment_size
308 u.max_segment_size = max_segment_size
309 u.encoding_param_k = k
310 u.encoding_param_happy = happy
311 u.encoding_param_n = n
312 eu = upload.EncryptAnUploadable(u)
313 d = e.set_encrypted_uploadable(eu)
317 k,happy,n = e.get_param("share_counts")
318 assert n == NUM_SHARES # else we'll be completely confused
320 for shnum in range(NUM_SHARES):
321 mode = bucket_modes.get(shnum, "good")
322 peer = FakeBucketWriterProxy(mode)
323 shareholders[shnum] = peer
324 e.set_shareholders(shareholders)
326 d.addCallback(_ready)
328 d1 = u.get_encryption_key()
329 d1.addCallback(lambda key: (res, key, shareholders))
334 def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
336 (uri_extension_hash, required_shares, num_shares, file_size) = res
338 if "corrupt_key" in recover_mode:
339 # we corrupt the key, so that the decrypted data is corrupted and
340 # will fail the plaintext hash check. Since we're manually
341 # attaching shareholders, the fact that the storage index is also
342 # corrupted doesn't matter.
345 u = uri.CHKFileURI(key=key,
346 uri_extension_hash=uri_extension_hash,
347 needed_shares=required_shares,
348 total_shares=num_shares,
352 client = FakeClient()
353 target = download.Data()
354 fd = download.FileDownloader(client, URI, target)
356 # we manually cycle the FileDownloader through a number of steps that
357 # would normally be sequenced by a Deferred chain in
358 # FileDownloader.start(), to give us more control over the process.
359 # In particular, by bypassing _get_all_shareholders, we skip
360 # permuted-peerlist selection.
361 for shnum, bucket in shareholders.items():
362 if shnum < AVAILABLE_SHARES and bucket.closed:
363 fd.add_share_bucket(shnum, bucket)
364 fd._got_all_shareholders(None)
366 # Make it possible to obtain uri_extension from the shareholders.
367 # Arrange for shareholders[0] to be the first, so we can selectively
368 # corrupt the data it returns.
369 fd._uri_extension_sources = shareholders.values()
370 fd._uri_extension_sources.remove(shareholders[0])
371 fd._uri_extension_sources.insert(0, shareholders[0])
373 d = defer.succeed(None)
375 # have the FileDownloader retrieve a copy of uri_extension itself
376 d.addCallback(fd._obtain_uri_extension)
378 if "corrupt_crypttext_hashes" in recover_mode:
379 # replace everybody's crypttext hash trees with a different one
380 # (computed over a different file), then modify our uri_extension
381 # to reflect the new crypttext hash tree root
382 def _corrupt_crypttext_hashes(uri_extension):
383 assert isinstance(uri_extension, dict)
384 assert 'crypttext_root_hash' in uri_extension
385 badhash = hashutil.tagged_hash("bogus", "data")
386 bad_crypttext_hashes = [badhash] * uri_extension['num_segments']
387 badtree = hashtree.HashTree(bad_crypttext_hashes)
388 for bucket in shareholders.values():
389 bucket.crypttext_hashes = list(badtree)
390 uri_extension['crypttext_root_hash'] = badtree[0]
392 d.addCallback(_corrupt_crypttext_hashes)
394 d.addCallback(fd._got_uri_extension)
396 # also have the FileDownloader ask for hash trees
397 d.addCallback(fd._get_hashtrees)
399 d.addCallback(fd._create_validated_buckets)
400 d.addCallback(fd._download_all_segments)
401 d.addCallback(fd._done)
407 def test_not_enough_shares(self):
408 d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
410 self.failUnless(isinstance(res, Failure))
411 self.failUnless(res.check(download.NotEnoughPeersError))
415 def test_one_share_per_peer(self):
416 return self.send_and_recover()
419 return self.send_and_recover(datalen=74)
421 return self.send_and_recover(datalen=75)
423 return self.send_and_recover(datalen=51)
426 return self.send_and_recover(datalen=99)
428 return self.send_and_recover(datalen=100)
430 return self.send_and_recover(datalen=76)
433 return self.send_and_recover(datalen=124)
435 return self.send_and_recover(datalen=125)
437 return self.send_and_recover(datalen=101)
439 # the following tests all use 4-out-of-10 encoding
441 def test_bad_blocks(self):
442 # the first 6 servers have bad blocks, which will be caught by the
444 modemap = dict([(i, "bad block")
447 for i in range(6, 10)])
448 return self.send_and_recover((4,8,10), bucket_modes=modemap)
450 def test_bad_blocks_failure(self):
451 # the first 7 servers have bad blocks, which will be caught by the
452 # blockhashes, and the download will fail
453 modemap = dict([(i, "bad block")
456 for i in range(7, 10)])
457 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
459 self.failUnless(isinstance(res, Failure))
460 self.failUnless(res.check(download.NotEnoughPeersError))
464 def test_bad_blockhashes(self):
465 # the first 6 servers have bad block hashes, so the blockhash tree
467 modemap = dict([(i, "bad blockhash")
470 for i in range(6, 10)])
471 return self.send_and_recover((4,8,10), bucket_modes=modemap)
473 def test_bad_blockhashes_failure(self):
474 # the first 7 servers have bad block hashes, so the blockhash tree
475 # will not validate, and the download will fail
476 modemap = dict([(i, "bad blockhash")
479 for i in range(7, 10)])
480 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
482 self.failUnless(isinstance(res, Failure))
483 self.failUnless(res.check(download.NotEnoughPeersError))
487 def test_bad_sharehashes(self):
488 # the first 6 servers have bad block hashes, so the sharehash tree
490 modemap = dict([(i, "bad sharehash")
493 for i in range(6, 10)])
494 return self.send_and_recover((4,8,10), bucket_modes=modemap)
496 def assertFetchFailureIn(self, fd, where):
497 expected = {"uri_extension": 0,
498 "plaintext_hashroot": 0,
499 "plaintext_hashtree": 0,
500 "crypttext_hashroot": 0,
501 "crypttext_hashtree": 0,
503 if where is not None:
505 self.failUnlessEqual(fd._fetch_failures, expected)
508 # just to make sure the test harness works when we aren't
509 # intentionally causing failures
510 modemap = dict([(i, "good") for i in range(0, 10)])
511 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
512 d.addCallback(self.assertFetchFailureIn, None)
515 def test_bad_uri_extension(self):
516 # the first server has a bad uri_extension block, so we will fail
517 # over to a different server.
518 modemap = dict([(i, "bad uri_extension") for i in range(1)] +
519 [(i, "good") for i in range(1, 10)])
520 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
521 d.addCallback(self.assertFetchFailureIn, "uri_extension")
524 def test_bad_sharehashes_failure(self):
525 # the first 7 servers have bad block hashes, so the sharehash tree
526 # will not validate, and the download will fail
527 modemap = dict([(i, "bad sharehash")
530 for i in range(7, 10)])
531 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
533 self.failUnless(isinstance(res, Failure))
534 self.failUnless(res.check(download.NotEnoughPeersError))
538 def test_missing_sharehashes(self):
539 # the first 6 servers are missing their sharehashes, so the
540 # sharehash tree will not validate
541 modemap = dict([(i, "missing sharehash")
544 for i in range(6, 10)])
545 return self.send_and_recover((4,8,10), bucket_modes=modemap)
547 def test_missing_sharehashes_failure(self):
548 # the first 7 servers are missing their sharehashes, so the
549 # sharehash tree will not validate, and the download will fail
550 modemap = dict([(i, "missing sharehash")
553 for i in range(7, 10)])
554 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
556 self.failUnless(isinstance(res, Failure))
557 self.failUnless(res.check(download.NotEnoughPeersError))
561 def test_lost_one_shareholder(self):
562 # we have enough shareholders when we start, but one segment in we
563 # lose one of them. The upload should still succeed, as long as we
564 # still have 'shares_of_happiness' peers left.
565 modemap = dict([(i, "good") for i in range(9)] +
566 [(i, "lost") for i in range(9, 10)])
567 return self.send_and_recover((4,8,10), bucket_modes=modemap)
569 def test_lost_one_shareholder_early(self):
570 # we have enough shareholders when we choose peers, but just before
571 # we send the 'start' message, we lose one of them. The upload should
572 # still succeed, as long as we still have 'shares_of_happiness' peers
574 modemap = dict([(i, "good") for i in range(9)] +
575 [(i, "lost-early") for i in range(9, 10)])
576 return self.send_and_recover((4,8,10), bucket_modes=modemap)
578 def test_lost_many_shareholders(self):
579 # we have enough shareholders when we start, but one segment in we
580 # lose all but one of them. The upload should fail.
581 modemap = dict([(i, "good") for i in range(1)] +
582 [(i, "lost") for i in range(1, 10)])
583 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
585 self.failUnless(isinstance(res, Failure))
586 self.failUnless(res.check(encode.NotEnoughPeersError), res)
590 def test_lost_all_shareholders(self):
591 # we have enough shareholders when we start, but one segment in we
592 # lose all of them. The upload should fail.
593 modemap = dict([(i, "lost") for i in range(10)])
594 d = self.send_and_recover((4,8,10), bucket_modes=modemap)
596 self.failUnless(isinstance(res, Failure))
597 self.failUnless(res.check(encode.NotEnoughPeersError))