3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer, reactor
6 from twisted.python import failure
7 from allmydata import uri, download
8 from allmydata.util import base32
9 from allmydata.util.idlib import shortnodeid_b2a
10 from allmydata.util.hashutil import tagged_hash
11 from allmydata.encode import NotEnoughSharesError
12 from allmydata.interfaces import IURI, IMutableFileURI, IUploadable
13 from foolscap.eventual import eventually, fireEventually
14 from foolscap.logging import log
17 from allmydata.mutable.node import MutableFileNode
18 from allmydata.mutable.common import DictOfSets, ResponseCache, \
19 MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, UnrecoverableFileError
20 from allmydata.mutable.retrieve import Retrieve
21 from allmydata.mutable.publish import Publish
22 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
23 from allmydata.mutable.layout import unpack_header, unpack_share
25 # this "FastMutableFileNode" exists solely to speed up tests by using smaller
26 # public/private keys. Once we switch to fast DSA-based keys, we can get rid
29 class FastMutableFileNode(MutableFileNode):
30 SIGNATURE_KEY_SIZE = 522
32 # this "FakeStorage" exists to put the share data in RAM and avoid using real
33 # network connections, both to speed up the tests and to reduce the amount of
34 # non-mutable.py code being exercised.
37 # this class replaces the collection of storage servers, allowing the
38 # tests to examine and manipulate the published shares. It also lets us
39 # control the order in which read queries are answered, to exercise more
40 # of the error-handling code in Retrieve .
42 # Note that we ignore the storage index: this FakeStorage instance can
43 # only be used for a single storage index.
48 # _sequence is used to cause the responses to occur in a specific
49 # order. If it is in use, then we will defer queries instead of
50 # answering them right away, accumulating the Deferreds in a dict. We
51 # don't know exactly how many queries we'll get, so exactly one
52 # second after the first query arrives, we will release them all (in
57 def read(self, peerid, storage_index):
58 shares = self._peers.get(peerid, {})
59 if self._sequence is None:
60 return defer.succeed(shares)
63 reactor.callLater(1.0, self._fire_readers)
64 self._pending[peerid] = (d, shares)
67 def _fire_readers(self):
68 pending = self._pending
71 for peerid in self._sequence:
73 d, shares = pending.pop(peerid)
74 eventually(d.callback, shares)
75 for (d, shares) in pending.values():
76 eventually(d.callback, shares)
78 def write(self, peerid, storage_index, shnum, offset, data):
79 if peerid not in self._peers:
80 self._peers[peerid] = {}
81 shares = self._peers[peerid]
83 f.write(shares.get(shnum, ""))
86 shares[shnum] = f.getvalue()
89 class FakeStorageServer:
90 def __init__(self, peerid, storage):
92 self.storage = storage
94 def callRemote(self, methname, *args, **kwargs):
96 meth = getattr(self, methname)
97 return meth(*args, **kwargs)
99 d.addCallback(lambda res: _call())
102 def slot_readv(self, storage_index, shnums, readv):
103 d = self.storage.read(self.peerid, storage_index)
107 if shnums and shnum not in shnums:
109 vector = response[shnum] = []
110 for (offset, length) in readv:
111 assert isinstance(offset, (int, long)), offset
112 assert isinstance(length, (int, long)), length
113 vector.append(shares[shnum][offset:offset+length])
118 def slot_testv_and_readv_and_writev(self, storage_index, secrets,
119 tw_vectors, read_vector):
120 # always-pass: parrot the test vectors back to them.
122 for shnum, (testv, writev, new_length) in tw_vectors.items():
123 for (offset, length, op, specimen) in testv:
124 assert op in ("le", "eq", "ge")
125 # TODO: this isn't right, the read is controlled by read_vector,
127 readv[shnum] = [ specimen
128 for (offset, length, op, specimen)
130 for (offset, data) in writev:
131 self.storage.write(self.peerid, storage_index, shnum,
133 answer = (True, readv)
134 return fireEventually(answer)
137 # our "FakeClient" has just enough functionality of the real Client to let
141 mutable_file_node_class = FastMutableFileNode
143 def __init__(self, num_peers=10):
144 self._storage = FakeStorage()
145 self._num_peers = num_peers
146 self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
147 for i in range(self._num_peers)]
148 self._connections = dict([(peerid, FakeStorageServer(peerid,
150 for peerid in self._peerids])
151 self.nodeid = "fakenodeid"
153 def log(self, msg, **kw):
154 return log.msg(msg, **kw)
156 def get_renewal_secret(self):
157 return "I hereby permit you to renew my files"
158 def get_cancel_secret(self):
159 return "I hereby permit you to cancel my leases"
161 def create_mutable_file(self, contents=""):
162 n = self.mutable_file_node_class(self)
163 d = n.create(contents)
164 d.addCallback(lambda res: n)
167 def notify_retrieve(self, r):
169 def notify_publish(self, p):
171 def notify_mapupdate(self, u):
174 def create_node_from_uri(self, u):
176 assert IMutableFileURI.providedBy(u), u
177 res = self.mutable_file_node_class(self).init_from_uri(u)
180 def get_permuted_peers(self, service_name, key):
182 @return: list of (peerid, connection,)
185 for (peerid, connection) in self._connections.items():
186 assert isinstance(peerid, str)
187 permuted = sha.new(key + peerid).digest()
188 results.append((permuted, peerid, connection))
190 results = [ (r[1],r[2]) for r in results]
193 def upload(self, uploadable):
194 assert IUploadable.providedBy(uploadable)
195 d = uploadable.get_size()
196 d.addCallback(lambda length: uploadable.read(length))
197 #d.addCallback(self.create_mutable_file)
198 def _got_data(datav):
199 data = "".join(datav)
200 #newnode = FastMutableFileNode(self)
201 return uri.LiteralFileURI(data)
202 d.addCallback(_got_data)
206 def flip_bit(original, byte_offset):
207 return (original[:byte_offset] +
208 chr(ord(original[byte_offset]) ^ 0x01) +
209 original[byte_offset+1:])
211 def corrupt(res, s, offset, shnums_to_corrupt=None):
212 # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
213 # list of shnums to corrupt.
214 for peerid in s._peers:
215 shares = s._peers[peerid]
217 if (shnums_to_corrupt is not None
218 and shnum not in shnums_to_corrupt):
225 k, N, segsize, datalen,
226 o) = unpack_header(data)
227 if isinstance(offset, tuple):
228 offset1, offset2 = offset
232 if offset1 == "pubkey":
235 real_offset = o[offset1]
237 real_offset = offset1
238 real_offset = int(real_offset) + offset2
239 assert isinstance(real_offset, int), offset
240 shares[shnum] = flip_bit(data, real_offset)
243 class Filenode(unittest.TestCase):
245 self.client = FakeClient()
247 def test_create(self):
248 d = self.client.create_mutable_file()
250 self.failUnless(isinstance(n, FastMutableFileNode))
251 peer0 = self.client._peerids[0]
252 shnums = self.client._storage._peers[peer0].keys()
253 self.failUnlessEqual(len(shnums), 1)
254 d.addCallback(_created)
257 def test_upload_and_download(self):
258 d = self.client.create_mutable_file()
260 d = defer.succeed(None)
261 d.addCallback(lambda res: n.update_servermap())
262 d.addCallback(lambda smap: smap.dump(StringIO()))
263 d.addCallback(lambda sio:
264 self.failUnless("3-of-10" in sio.getvalue()))
265 d.addCallback(lambda res: n.overwrite("contents 1"))
266 d.addCallback(lambda res: self.failUnlessIdentical(res, None))
267 d.addCallback(lambda res: n.download_to_data())
268 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
269 d.addCallback(lambda res: n.overwrite("contents 2"))
270 d.addCallback(lambda res: n.download_to_data())
271 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
272 d.addCallback(lambda res: n.download(download.Data()))
273 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
274 d.addCallback(lambda res: n.update("contents 3"))
275 d.addCallback(lambda res: n.download_to_data())
276 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
278 d.addCallback(_created)
281 def test_create_with_initial_contents(self):
282 d = self.client.create_mutable_file("contents 1")
284 d = n.download_to_data()
285 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
286 d.addCallback(lambda res: n.overwrite("contents 2"))
287 d.addCallback(lambda res: n.download_to_data())
288 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
290 d.addCallback(_created)
293 def test_upload_and_download_full_size_keys(self):
294 self.client.mutable_file_node_class = MutableFileNode
295 d = self.client.create_mutable_file()
297 d = defer.succeed(None)
298 d.addCallback(lambda res: n.update_servermap())
299 d.addCallback(lambda smap: smap.dump(StringIO()))
300 d.addCallback(lambda sio:
301 self.failUnless("3-of-10" in sio.getvalue()))
302 d.addCallback(lambda res: n.overwrite("contents 1"))
303 d.addCallback(lambda res: self.failUnlessIdentical(res, None))
304 d.addCallback(lambda res: n.download_to_data())
305 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
306 d.addCallback(lambda res: n.overwrite("contents 2"))
307 d.addCallback(lambda res: n.download_to_data())
308 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
309 d.addCallback(lambda res: n.download(download.Data()))
310 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
311 d.addCallback(lambda res: n.update("contents 3"))
312 d.addCallback(lambda res: n.download_to_data())
313 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
315 d.addCallback(_created)
319 class MakeShares(unittest.TestCase):
320 def test_encrypt(self):
322 fn = FastMutableFileNode(c)
323 CONTENTS = "some initial contents"
324 d = fn.create(CONTENTS)
326 p = Publish(fn, None)
328 p.readkey = "\x00" * 16
330 p.required_shares = 3
332 p.setup_encoding_parameters()
333 return p._encrypt_and_encode()
334 d.addCallback(_created)
335 def _done(shares_and_shareids):
336 (shares, share_ids) = shares_and_shareids
337 self.failUnlessEqual(len(shares), 10)
339 self.failUnless(isinstance(sh, str))
340 self.failUnlessEqual(len(sh), 7)
341 self.failUnlessEqual(len(share_ids), 10)
345 def test_generate(self):
347 fn = FastMutableFileNode(c)
348 CONTENTS = "some initial contents"
350 p = mutable.Publish(fn)
351 r = mutable.Retrieve(fn)
352 # make some fake shares
353 shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
355 p._privkey = FakePrivKey(0)
356 p._encprivkey = "encprivkey"
357 p._pubkey = FakePubKey(0)
358 d = defer.maybeDeferred(p._generate_shares,
366 def _done( (seqnum, root_hash, final_shares, target_info2) ):
367 self.failUnlessEqual(seqnum, 3)
368 self.failUnlessEqual(len(root_hash), 32)
369 self.failUnless(isinstance(final_shares, dict))
370 self.failUnlessEqual(len(final_shares), 10)
371 self.failUnlessEqual(sorted(final_shares.keys()), range(10))
372 for i,sh in final_shares.items():
373 self.failUnless(isinstance(sh, str))
374 # feed the share through the unpacker as a sanity-check
375 pieces = unpack_share(sh)
376 (u_seqnum, u_root_hash, IV, k, N, segsize, datalen,
377 pubkey, signature, share_hash_chain, block_hash_tree,
378 share_data, enc_privkey) = pieces
379 self.failUnlessEqual(u_seqnum, 3)
380 self.failUnlessEqual(u_root_hash, root_hash)
381 self.failUnlessEqual(k, 3)
382 self.failUnlessEqual(N, 10)
383 self.failUnlessEqual(segsize, 21)
384 self.failUnlessEqual(datalen, len(CONTENTS))
385 self.failUnlessEqual(pubkey, p._pubkey.serialize())
386 sig_material = struct.pack(">BQ32s16s BBQQ",
387 0, p._new_seqnum, root_hash, IV,
388 k, N, segsize, datalen)
389 self.failUnless(p._pubkey.verify(sig_material, signature))
390 #self.failUnlessEqual(signature, p._privkey.sign(sig_material))
391 self.failUnless(isinstance(share_hash_chain, dict))
392 self.failUnlessEqual(len(share_hash_chain), 4) # ln2(10)++
393 for shnum,share_hash in share_hash_chain.items():
394 self.failUnless(isinstance(shnum, int))
395 self.failUnless(isinstance(share_hash, str))
396 self.failUnlessEqual(len(share_hash), 32)
397 self.failUnless(isinstance(block_hash_tree, list))
398 self.failUnlessEqual(len(block_hash_tree), 1) # very small tree
399 self.failUnlessEqual(IV, "SALT"*4)
400 self.failUnlessEqual(len(share_data), len("%07d" % 1))
401 self.failUnlessEqual(enc_privkey, fn.get_encprivkey())
402 d.addCallback(_generated)
405 # TODO: when we publish to 20 peers, we should get one share per peer on 10
406 # when we publish to 3 peers, we should get either 3 or 4 shares per peer
407 # when we publish to zero peers, we should get a NotEnoughSharesError
409 class Servermap(unittest.TestCase):
411 # publish a file and create shares, which can then be manipulated
414 self._client = FakeClient(num_peers)
415 self._storage = self._client._storage
416 d = self._client.create_mutable_file("New contents go here")
419 d.addCallback(_created)
422 def make_servermap(self, mode=MODE_CHECK):
423 smu = ServermapUpdater(self._fn, ServerMap(), mode)
427 def update_servermap(self, oldmap, mode=MODE_CHECK):
428 smu = ServermapUpdater(self._fn, oldmap, mode)
432 def failUnlessOneRecoverable(self, sm, num_shares):
433 self.failUnlessEqual(len(sm.recoverable_versions()), 1)
434 self.failUnlessEqual(len(sm.unrecoverable_versions()), 0)
435 best = sm.best_recoverable_version()
436 self.failIfEqual(best, None)
437 self.failUnlessEqual(sm.recoverable_versions(), set([best]))
438 self.failUnlessEqual(len(sm.shares_available()), 1)
439 self.failUnlessEqual(sm.shares_available()[best], (num_shares, 3))
442 def test_basic(self):
443 d = defer.succeed(None)
444 ms = self.make_servermap
445 us = self.update_servermap
447 d.addCallback(lambda res: ms(mode=MODE_CHECK))
448 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
449 d.addCallback(lambda res: ms(mode=MODE_WRITE))
450 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
451 d.addCallback(lambda res: ms(mode=MODE_READ))
452 # this more stops at k+epsilon, and epsilon=k, so 6 shares
453 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
454 d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
455 # this mode stops at 'k' shares
456 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
458 # and can we re-use the same servermap? Note that these are sorted in
459 # increasing order of number of servers queried, since once a server
460 # gets into the servermap, we'll always ask it for an update.
461 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
462 d.addCallback(lambda sm: us(sm, mode=MODE_READ))
463 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
464 d.addCallback(lambda sm: us(sm, mode=MODE_WRITE))
465 d.addCallback(lambda sm: us(sm, mode=MODE_CHECK))
466 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
467 d.addCallback(lambda sm: us(sm, mode=MODE_ANYTHING))
468 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
472 def test_mark_bad(self):
473 d = defer.succeed(None)
474 ms = self.make_servermap
475 us = self.update_servermap
477 d.addCallback(lambda res: ms(mode=MODE_READ))
478 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
480 v = sm.best_recoverable_version()
481 vm = sm.make_versionmap()
483 self.failUnlessEqual(len(shares), 6)
484 self._corrupted = set()
485 # mark the first 5 shares as corrupt, then update the servermap.
486 # The map should not have the marked shares it in any more, and
487 # new shares should be found to replace the missing ones.
488 for (shnum, peerid, timestamp) in shares:
490 self._corrupted.add( (peerid, shnum) )
491 sm.mark_bad_share(peerid, shnum)
492 return self.update_servermap(sm, MODE_WRITE)
493 d.addCallback(_made_map)
495 # this should find all 5 shares that weren't marked bad
496 v = sm.best_recoverable_version()
497 vm = sm.make_versionmap()
499 for (peerid, shnum) in self._corrupted:
500 peer_shares = sm.shares_on_peer(peerid)
501 self.failIf(shnum in peer_shares,
502 "%d was in %s" % (shnum, peer_shares))
503 self.failUnlessEqual(len(shares), 5)
504 d.addCallback(_check_map)
507 def failUnlessNoneRecoverable(self, sm):
508 self.failUnlessEqual(len(sm.recoverable_versions()), 0)
509 self.failUnlessEqual(len(sm.unrecoverable_versions()), 0)
510 best = sm.best_recoverable_version()
511 self.failUnlessEqual(best, None)
512 self.failUnlessEqual(len(sm.shares_available()), 0)
514 def test_no_shares(self):
515 self._client._storage._peers = {} # delete all shares
516 ms = self.make_servermap
517 d = defer.succeed(None)
519 d.addCallback(lambda res: ms(mode=MODE_CHECK))
520 d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
522 d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
523 d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
525 d.addCallback(lambda res: ms(mode=MODE_WRITE))
526 d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
528 d.addCallback(lambda res: ms(mode=MODE_READ))
529 d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
533 def failUnlessNotQuiteEnough(self, sm):
534 self.failUnlessEqual(len(sm.recoverable_versions()), 0)
535 self.failUnlessEqual(len(sm.unrecoverable_versions()), 1)
536 best = sm.best_recoverable_version()
537 self.failUnlessEqual(best, None)
538 self.failUnlessEqual(len(sm.shares_available()), 1)
539 self.failUnlessEqual(sm.shares_available().values()[0], (2,3) )
541 def test_not_quite_enough_shares(self):
542 s = self._client._storage
543 ms = self.make_servermap
544 num_shares = len(s._peers)
545 for peerid in s._peers:
546 s._peers[peerid] = {}
550 # now there ought to be only two shares left
551 assert len([peerid for peerid in s._peers if s._peers[peerid]]) == 2
553 d = defer.succeed(None)
555 d.addCallback(lambda res: ms(mode=MODE_CHECK))
556 d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
557 d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
558 d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
559 d.addCallback(lambda res: ms(mode=MODE_WRITE))
560 d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
561 d.addCallback(lambda res: ms(mode=MODE_READ))
562 d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
568 class Roundtrip(unittest.TestCase):
570 # publish a file and create shares, which can then be manipulated
572 self.CONTENTS = "New contents go here"
574 self._client = FakeClient(num_peers)
575 self._storage = self._client._storage
576 d = self._client.create_mutable_file(self.CONTENTS)
579 d.addCallback(_created)
582 def make_servermap(self, mode=MODE_READ, oldmap=None):
585 smu = ServermapUpdater(self._fn, oldmap, mode)
589 def abbrev_verinfo(self, verinfo):
592 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
593 offsets_tuple) = verinfo
594 return "%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
596 def abbrev_verinfo_dict(self, verinfo_d):
598 for verinfo,value in verinfo_d.items():
599 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
600 offsets_tuple) = verinfo
601 output["%d-%s" % (seqnum, base32.b2a(root_hash)[:4])] = value
604 def dump_servermap(self, servermap):
605 print "SERVERMAP", servermap
606 print "RECOVERABLE", [self.abbrev_verinfo(v)
607 for v in servermap.recoverable_versions()]
608 print "BEST", self.abbrev_verinfo(servermap.best_recoverable_version())
609 print "available", self.abbrev_verinfo_dict(servermap.shares_available())
611 def do_download(self, servermap, version=None):
613 version = servermap.best_recoverable_version()
614 r = Retrieve(self._fn, servermap, version)
617 def test_basic(self):
618 d = self.make_servermap()
619 def _do_retrieve(servermap):
620 self._smap = servermap
621 #self.dump_servermap(servermap)
622 self.failUnlessEqual(len(servermap.recoverable_versions()), 1)
623 return self.do_download(servermap)
624 d.addCallback(_do_retrieve)
625 def _retrieved(new_contents):
626 self.failUnlessEqual(new_contents, self.CONTENTS)
627 d.addCallback(_retrieved)
628 # we should be able to re-use the same servermap, both with and
629 # without updating it.
630 d.addCallback(lambda res: self.do_download(self._smap))
631 d.addCallback(_retrieved)
632 d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
633 d.addCallback(lambda res: self.do_download(self._smap))
634 d.addCallback(_retrieved)
635 # clobbering the pubkey should make the servermap updater re-fetch it
636 def _clobber_pubkey(res):
637 self._fn._pubkey = None
638 d.addCallback(_clobber_pubkey)
639 d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
640 d.addCallback(lambda res: self.do_download(self._smap))
641 d.addCallback(_retrieved)
645 def shouldFail(self, expected_failure, which, substring,
646 callable, *args, **kwargs):
647 assert substring is None or isinstance(substring, str)
648 d = defer.maybeDeferred(callable, *args, **kwargs)
650 if isinstance(res, failure.Failure):
651 res.trap(expected_failure)
653 self.failUnless(substring in str(res),
654 "substring '%s' not in '%s'"
655 % (substring, str(res)))
657 self.fail("%s was supposed to raise %s, not get '%s'" %
658 (which, expected_failure, res))
662 def _test_corrupt_all(self, offset, substring,
663 should_succeed=False, corrupt_early=True):
664 d = defer.succeed(None)
666 d.addCallback(corrupt, self._storage, offset)
667 d.addCallback(lambda res: self.make_servermap())
668 if not corrupt_early:
669 d.addCallback(corrupt, self._storage, offset)
670 def _do_retrieve(servermap):
671 ver = servermap.best_recoverable_version()
672 if ver is None and not should_succeed:
673 # no recoverable versions == not succeeding. The problem
674 # should be noted in the servermap's list of problems.
676 allproblems = [str(f) for f in servermap.problems]
677 self.failUnless(substring in "".join(allproblems))
680 d1 = self._fn.download_to_data()
681 d1.addCallback(lambda new_contents:
682 self.failUnlessEqual(new_contents, self.CONTENTS))
685 return self.shouldFail(NotEnoughSharesError,
686 "_corrupt_all(offset=%s)" % (offset,),
688 self._fn.download_to_data)
689 d.addCallback(_do_retrieve)
692 def test_corrupt_all_verbyte(self):
693 # when the version byte is not 0, we hit an assertion error in
695 return self._test_corrupt_all(0, "AssertionError")
697 def test_corrupt_all_seqnum(self):
698 # a corrupt sequence number will trigger a bad signature
699 return self._test_corrupt_all(1, "signature is invalid")
701 def test_corrupt_all_R(self):
702 # a corrupt root hash will trigger a bad signature
703 return self._test_corrupt_all(9, "signature is invalid")
705 def test_corrupt_all_IV(self):
706 # a corrupt salt/IV will trigger a bad signature
707 return self._test_corrupt_all(41, "signature is invalid")
709 def test_corrupt_all_k(self):
710 # a corrupt 'k' will trigger a bad signature
711 return self._test_corrupt_all(57, "signature is invalid")
713 def test_corrupt_all_N(self):
714 # a corrupt 'N' will trigger a bad signature
715 return self._test_corrupt_all(58, "signature is invalid")
717 def test_corrupt_all_segsize(self):
718 # a corrupt segsize will trigger a bad signature
719 return self._test_corrupt_all(59, "signature is invalid")
721 def test_corrupt_all_datalen(self):
722 # a corrupt data length will trigger a bad signature
723 return self._test_corrupt_all(67, "signature is invalid")
725 def test_corrupt_all_pubkey(self):
726 # a corrupt pubkey won't match the URI's fingerprint. We need to
727 # remove the pubkey from the filenode, or else it won't bother trying
729 self._fn._pubkey = None
730 return self._test_corrupt_all("pubkey",
731 "pubkey doesn't match fingerprint")
733 def test_corrupt_all_sig(self):
734 # a corrupt signature is a bad one
735 # the signature runs from about [543:799], depending upon the length
737 return self._test_corrupt_all("signature", "signature is invalid")
739 def test_corrupt_all_share_hash_chain_number(self):
740 # a corrupt share hash chain entry will show up as a bad hash. If we
741 # mangle the first byte, that will look like a bad hash number,
742 # causing an IndexError
743 return self._test_corrupt_all("share_hash_chain", "corrupt hashes")
745 def test_corrupt_all_share_hash_chain_hash(self):
746 # a corrupt share hash chain entry will show up as a bad hash. If we
747 # mangle a few bytes in, that will look like a bad hash.
748 return self._test_corrupt_all(("share_hash_chain",4), "corrupt hashes")
750 def test_corrupt_all_block_hash_tree(self):
751 return self._test_corrupt_all("block_hash_tree",
752 "block hash tree failure")
754 def test_corrupt_all_block(self):
755 return self._test_corrupt_all("share_data", "block hash tree failure")
757 def test_corrupt_all_encprivkey(self):
758 # a corrupted privkey won't even be noticed by the reader, only by a
760 return self._test_corrupt_all("enc_privkey", None, should_succeed=True)
762 def test_basic_pubkey_at_end(self):
763 # we corrupt the pubkey in all but the last 'k' shares, allowing the
764 # download to succeed but forcing a bunch of retries first. Note that
765 # this is rather pessimistic: our Retrieve process will throw away
766 # the whole share if the pubkey is bad, even though the rest of the
767 # share might be good.
769 self._fn._pubkey = None
770 k = self._fn.get_required_shares()
771 N = self._fn.get_total_shares()
772 d = defer.succeed(None)
773 d.addCallback(corrupt, self._storage, "pubkey",
774 shnums_to_corrupt=range(0, N-k))
775 d.addCallback(lambda res: self.make_servermap())
776 def _do_retrieve(servermap):
777 self.failUnless(servermap.problems)
778 self.failUnless("pubkey doesn't match fingerprint"
779 in str(servermap.problems[0]))
780 ver = servermap.best_recoverable_version()
781 r = Retrieve(self._fn, servermap, ver)
783 d.addCallback(_do_retrieve)
784 d.addCallback(lambda new_contents:
785 self.failUnlessEqual(new_contents, self.CONTENTS))
788 def test_corrupt_some(self):
789 # corrupt the data of first five shares (so the servermap thinks
790 # they're good but retrieve marks them as bad), so that the
791 # MODE_READ set of 6 will be insufficient, forcing node.download to
792 # retry with more servers.
793 corrupt(None, self._storage, "share_data", range(5))
794 d = self.make_servermap()
795 def _do_retrieve(servermap):
796 ver = servermap.best_recoverable_version()
798 return self._fn.download_to_data()
799 d.addCallback(_do_retrieve)
800 d.addCallback(lambda new_contents:
801 self.failUnlessEqual(new_contents, self.CONTENTS))
804 def test_download_fails(self):
805 corrupt(None, self._storage, "signature")
806 d = self.shouldFail(UnrecoverableFileError, "test_download_anyway",
807 "no recoverable versions",
808 self._fn.download_to_data)
812 class MultipleEncodings(unittest.TestCase):
814 self.CONTENTS = "New contents go here"
816 self._client = FakeClient(num_peers)
817 self._storage = self._client._storage
818 d = self._client.create_mutable_file(self.CONTENTS)
821 d.addCallback(_created)
824 def _encode(self, k, n, data):
825 # encode 'data' into a peerid->shares dict.
827 fn2 = FastMutableFileNode(self._client)
828 # init_from_uri populates _uri, _writekey, _readkey, _storage_index,
831 fn2.init_from_uri(fn.get_uri())
832 # then we copy over other fields that are normally fetched from the
834 fn2._pubkey = fn._pubkey
835 fn2._privkey = fn._privkey
836 fn2._encprivkey = fn._encprivkey
837 fn2._current_seqnum = 0
838 fn2._current_roothash = "\x00" * 32
839 # and set the encoding parameters to something completely different
840 fn2._required_shares = k
841 fn2._total_shares = n
843 s = self._client._storage
844 s._peers = {} # clear existing storage
845 p2 = Publish(fn2, None)
851 d.addCallback(_published)
854 def make_servermap(self, mode=MODE_READ, oldmap=None):
857 smu = ServermapUpdater(self._fn, oldmap, mode)
861 def test_multiple_encodings(self):
862 # we encode the same file in two different ways (3-of-10 and 4-of-9),
863 # then mix up the shares, to make sure that download survives seeing
864 # a variety of encodings. This is actually kind of tricky to set up.
866 contents1 = "Contents for encoding 1 (3-of-10) go here"
867 contents2 = "Contents for encoding 2 (4-of-9) go here"
868 contents3 = "Contents for encoding 3 (4-of-7) go here"
870 # we make a retrieval object that doesn't know what encoding
872 fn3 = FastMutableFileNode(self._client)
873 fn3.init_from_uri(self._fn.get_uri())
875 # now we upload a file through fn1, and grab its shares
876 d = self._encode(3, 10, contents1)
877 def _encoded_1(shares):
878 self._shares1 = shares
879 d.addCallback(_encoded_1)
880 d.addCallback(lambda res: self._encode(4, 9, contents2))
881 def _encoded_2(shares):
882 self._shares2 = shares
883 d.addCallback(_encoded_2)
884 d.addCallback(lambda res: self._encode(4, 7, contents3))
885 def _encoded_3(shares):
886 self._shares3 = shares
887 d.addCallback(_encoded_3)
890 log.msg("merging sharelists")
891 # we merge the shares from the two sets, leaving each shnum in
892 # its original location, but using a share from set1 or set2
893 # according to the following sequence:
904 # so that neither form can be recovered until fetch [f], at which
905 # point version-s1 (the 3-of-10 form) should be recoverable. If
906 # the implementation latches on to the first version it sees,
907 # then s2 will be recoverable at fetch [g].
909 # Later, when we implement code that handles multiple versions,
910 # we can use this framework to assert that all recoverable
911 # versions are retrieved, and test that 'epsilon' does its job
913 places = [2, 2, 3, 2, 1, 1, 1, 2]
917 for i,peerid in enumerate(self._client._peerids):
918 peerid_s = shortnodeid_b2a(peerid)
919 for shnum in self._shares1.get(peerid, {}):
920 if shnum < len(places):
921 which = places[shnum]
924 self._client._storage._peers[peerid] = peers = {}
925 in_1 = shnum in self._shares1[peerid]
926 in_2 = shnum in self._shares2.get(peerid, {})
927 in_3 = shnum in self._shares3.get(peerid, {})
928 #print peerid_s, shnum, which, in_1, in_2, in_3
931 peers[shnum] = self._shares1[peerid][shnum]
932 sharemap[shnum] = peerid
935 peers[shnum] = self._shares2[peerid][shnum]
936 sharemap[shnum] = peerid
939 peers[shnum] = self._shares3[peerid][shnum]
940 sharemap[shnum] = peerid
942 # we don't bother placing any other shares
943 # now sort the sequence so that share 0 is returned first
944 new_sequence = [sharemap[shnum]
945 for shnum in sorted(sharemap.keys())]
946 self._client._storage._sequence = new_sequence
947 log.msg("merge done")
948 d.addCallback(_merge)
949 d.addCallback(lambda res: fn3.download_to_data())
950 def _retrieved(new_contents):
951 # the current specified behavior is "first version recoverable"
952 self.failUnlessEqual(new_contents, contents1)
953 d.addCallback(_retrieved)
957 class Utils(unittest.TestCase):
958 def test_dict_of_sets(self):
964 self.failUnlessEqual(ds[1], set(["a"]))
965 self.failUnlessEqual(ds[2], set(["b", "c"]))
966 ds.discard(3, "d") # should not raise an exception
968 self.failUnlessEqual(ds[2], set(["c"]))
972 def _do_inside(self, c, x_start, x_length, y_start, y_length):
973 # we compare this against sets of integers
974 x = set(range(x_start, x_start+x_length))
975 y = set(range(y_start, y_start+y_length))
976 should_be_inside = x.issubset(y)
977 self.failUnlessEqual(should_be_inside, c._inside(x_start, x_length,
979 str((x_start, x_length, y_start, y_length)))
981 def test_cache_inside(self):
985 for y_start in range(8, 17):
986 for y_length in range(8):
987 self._do_inside(c, x_start, x_length, y_start, y_length)
989 def _do_overlap(self, c, x_start, x_length, y_start, y_length):
990 # we compare this against sets of integers
991 x = set(range(x_start, x_start+x_length))
992 y = set(range(y_start, y_start+y_length))
993 overlap = bool(x.intersection(y))
994 self.failUnlessEqual(overlap, c._does_overlap(x_start, x_length,
996 str((x_start, x_length, y_start, y_length)))
998 def test_cache_overlap(self):
1002 for y_start in range(8, 17):
1003 for y_length in range(8):
1004 self._do_overlap(c, x_start, x_length, y_start, y_length)
1006 def test_cache(self):
1008 # xdata = base62.b2a(os.urandom(100))[:100]
1009 xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
1010 ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
1012 c.add("v1", 1, 0, xdata, "time0")
1013 c.add("v1", 1, 2000, ydata, "time1")
1014 self.failUnlessEqual(c.read("v2", 1, 10, 11), nope)
1015 self.failUnlessEqual(c.read("v1", 2, 10, 11), nope)
1016 self.failUnlessEqual(c.read("v1", 1, 0, 10), (xdata[:10], "time0"))
1017 self.failUnlessEqual(c.read("v1", 1, 90, 10), (xdata[90:], "time0"))
1018 self.failUnlessEqual(c.read("v1", 1, 300, 10), nope)
1019 self.failUnlessEqual(c.read("v1", 1, 2050, 5), (ydata[50:55], "time1"))
1020 self.failUnlessEqual(c.read("v1", 1, 0, 101), nope)
1021 self.failUnlessEqual(c.read("v1", 1, 99, 1), (xdata[99:100], "time0"))
1022 self.failUnlessEqual(c.read("v1", 1, 100, 1), nope)
1023 self.failUnlessEqual(c.read("v1", 1, 1990, 9), nope)
1024 self.failUnlessEqual(c.read("v1", 1, 1990, 10), nope)
1025 self.failUnlessEqual(c.read("v1", 1, 1990, 11), nope)
1026 self.failUnlessEqual(c.read("v1", 1, 1990, 15), nope)
1027 self.failUnlessEqual(c.read("v1", 1, 1990, 19), nope)
1028 self.failUnlessEqual(c.read("v1", 1, 1990, 20), nope)
1029 self.failUnlessEqual(c.read("v1", 1, 1990, 21), nope)
1030 self.failUnlessEqual(c.read("v1", 1, 1990, 25), nope)
1031 self.failUnlessEqual(c.read("v1", 1, 1999, 25), nope)
1033 # optional: join fragments
1035 c.add("v1", 1, 0, xdata[:10], "time0")
1036 c.add("v1", 1, 10, xdata[10:20], "time1")
1037 #self.failUnlessEqual(c.read("v1", 1, 0, 20), (xdata[:20], "time0"))