3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer, reactor
6 from allmydata import uri, client
7 from allmydata.nodemaker import NodeMaker
8 from allmydata.immutable import download
9 from allmydata.util import base32
10 from allmydata.util.idlib import shortnodeid_b2a
11 from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
12 ssk_pubkey_fingerprint_hash
13 from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \
15 from allmydata.monitor import Monitor
16 from allmydata.test.common import ShouldFailMixin
17 from allmydata.test.no_network import GridTestMixin
18 from foolscap.api import eventually, fireEventually
19 from foolscap.logging import log
20 from allmydata.storage_client import StorageFarmBroker
22 from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
23 from allmydata.mutable.common import ResponseCache, \
24 MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
25 NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
26 NotEnoughServersError, CorruptShareError
27 from allmydata.mutable.retrieve import Retrieve
28 from allmydata.mutable.publish import Publish
29 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
30 from allmydata.mutable.layout import unpack_header, unpack_share
31 from allmydata.mutable.repairer import MustForceRepairError
33 import common_util as testutil
35 # this "FakeStorage" exists to put the share data in RAM and avoid using real
36 # network connections, both to speed up the tests and to reduce the amount of
37 # non-mutable.py code being exercised.
40 # this class replaces the collection of storage servers, allowing the
41 # tests to examine and manipulate the published shares. It also lets us
42 # control the order in which read queries are answered, to exercise more
43 # of the error-handling code in Retrieve .
45 # Note that we ignore the storage index: this FakeStorage instance can
46 # only be used for a single storage index.
51 # _sequence is used to cause the responses to occur in a specific
52 # order. If it is in use, then we will defer queries instead of
53 # answering them right away, accumulating the Deferreds in a dict. We
54 # don't know exactly how many queries we'll get, so exactly one
55 # second after the first query arrives, we will release them all (in
59 self._pending_timer = None
61 def read(self, peerid, storage_index):
62 shares = self._peers.get(peerid, {})
63 if self._sequence is None:
64 return defer.succeed(shares)
67 self._pending_timer = reactor.callLater(1.0, self._fire_readers)
68 self._pending[peerid] = (d, shares)
71 def _fire_readers(self):
72 self._pending_timer = None
73 pending = self._pending
76 for peerid in self._sequence:
78 d, shares = pending.pop(peerid)
79 eventually(d.callback, shares)
80 for (d, shares) in pending.values():
81 eventually(d.callback, shares)
83 def write(self, peerid, storage_index, shnum, offset, data):
84 if peerid not in self._peers:
85 self._peers[peerid] = {}
86 shares = self._peers[peerid]
88 f.write(shares.get(shnum, ""))
91 shares[shnum] = f.getvalue()
94 class FakeStorageServer:
95 def __init__(self, peerid, storage):
97 self.storage = storage
99 def callRemote(self, methname, *args, **kwargs):
101 meth = getattr(self, methname)
102 return meth(*args, **kwargs)
104 d.addCallback(lambda res: _call())
106 def callRemoteOnly(self, methname, *args, **kwargs):
107 d = self.callRemote(methname, *args, **kwargs)
108 d.addBoth(lambda ignore: None)
111 def advise_corrupt_share(self, share_type, storage_index, shnum, reason):
114 def slot_readv(self, storage_index, shnums, readv):
115 d = self.storage.read(self.peerid, storage_index)
119 if shnums and shnum not in shnums:
121 vector = response[shnum] = []
122 for (offset, length) in readv:
123 assert isinstance(offset, (int, long)), offset
124 assert isinstance(length, (int, long)), length
125 vector.append(shares[shnum][offset:offset+length])
130 def slot_testv_and_readv_and_writev(self, storage_index, secrets,
131 tw_vectors, read_vector):
132 # always-pass: parrot the test vectors back to them.
134 for shnum, (testv, writev, new_length) in tw_vectors.items():
135 for (offset, length, op, specimen) in testv:
136 assert op in ("le", "eq", "ge")
137 # TODO: this isn't right, the read is controlled by read_vector,
139 readv[shnum] = [ specimen
140 for (offset, length, op, specimen)
142 for (offset, data) in writev:
143 self.storage.write(self.peerid, storage_index, shnum,
145 answer = (True, readv)
146 return fireEventually(answer)
149 def flip_bit(original, byte_offset):
150 return (original[:byte_offset] +
151 chr(ord(original[byte_offset]) ^ 0x01) +
152 original[byte_offset+1:])
154 def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
155 # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
156 # list of shnums to corrupt.
157 for peerid in s._peers:
158 shares = s._peers[peerid]
160 if (shnums_to_corrupt is not None
161 and shnum not in shnums_to_corrupt):
168 k, N, segsize, datalen,
169 o) = unpack_header(data)
170 if isinstance(offset, tuple):
171 offset1, offset2 = offset
175 if offset1 == "pubkey":
178 real_offset = o[offset1]
180 real_offset = offset1
181 real_offset = int(real_offset) + offset2 + offset_offset
182 assert isinstance(real_offset, int), offset
183 shares[shnum] = flip_bit(data, real_offset)
186 def make_storagebroker(s=None, num_peers=10):
189 peerids = [tagged_hash("peerid", "%d" % i)[:20]
190 for i in range(num_peers)]
191 storage_broker = StorageFarmBroker(None, True)
192 for peerid in peerids:
193 fss = FakeStorageServer(peerid, s)
194 storage_broker.test_add_server(peerid, fss)
195 return storage_broker
197 def make_nodemaker(s=None, num_peers=10):
198 storage_broker = make_storagebroker(s, num_peers)
199 sh = client.SecretHolder("lease secret")
200 keygen = client.KeyGenerator()
201 keygen.set_default_keysize(522)
202 nodemaker = NodeMaker(storage_broker, sh, None,
204 {"k": 3, "n": 10}, keygen)
207 class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
208 # this used to be in Publish, but we removed the limit. Some of
209 # these tests test whether the new code correctly allows files
210 # larger than the limit.
211 OLD_MAX_SEGMENT_SIZE = 3500000
213 self._storage = s = FakeStorage()
214 self.nodemaker = make_nodemaker(s)
216 def test_create(self):
217 d = self.nodemaker.create_mutable_file()
219 self.failUnless(isinstance(n, MutableFileNode))
220 self.failUnlessEqual(n.get_storage_index(), n._storage_index)
221 sb = self.nodemaker.storage_broker
222 peer0 = sorted(sb.get_all_serverids())[0]
223 shnums = self._storage._peers[peer0].keys()
224 self.failUnlessEqual(len(shnums), 1)
225 d.addCallback(_created)
228 def test_serialize(self):
229 n = MutableFileNode(None, None, {"k": 3, "n": 10}, None)
231 def _callback(*args, **kwargs):
232 self.failUnlessEqual(args, (4,) )
233 self.failUnlessEqual(kwargs, {"foo": 5})
236 d = n._do_serialized(_callback, 4, foo=5)
237 def _check_callback(res):
238 self.failUnlessEqual(res, 6)
239 self.failUnlessEqual(calls, [1])
240 d.addCallback(_check_callback)
243 raise ValueError("heya")
244 d.addCallback(lambda res:
245 self.shouldFail(ValueError, "_check_errback", "heya",
246 n._do_serialized, _errback))
249 def test_upload_and_download(self):
250 d = self.nodemaker.create_mutable_file()
252 d = defer.succeed(None)
253 d.addCallback(lambda res: n.get_servermap(MODE_READ))
254 d.addCallback(lambda smap: smap.dump(StringIO()))
255 d.addCallback(lambda sio:
256 self.failUnless("3-of-10" in sio.getvalue()))
257 d.addCallback(lambda res: n.overwrite("contents 1"))
258 d.addCallback(lambda res: self.failUnlessIdentical(res, None))
259 d.addCallback(lambda res: n.download_best_version())
260 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
261 d.addCallback(lambda res: n.get_size_of_best_version())
262 d.addCallback(lambda size:
263 self.failUnlessEqual(size, len("contents 1")))
264 d.addCallback(lambda res: n.overwrite("contents 2"))
265 d.addCallback(lambda res: n.download_best_version())
266 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
267 d.addCallback(lambda res: n.download(download.Data()))
268 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
269 d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
270 d.addCallback(lambda smap: n.upload("contents 3", smap))
271 d.addCallback(lambda res: n.download_best_version())
272 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
273 d.addCallback(lambda res: n.get_servermap(MODE_ANYTHING))
274 d.addCallback(lambda smap:
275 n.download_version(smap,
276 smap.best_recoverable_version()))
277 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
278 # test a file that is large enough to overcome the
279 # mapupdate-to-retrieve data caching (i.e. make the shares larger
280 # than the default readsize, which is 2000 bytes). A 15kB file
281 # will have 5kB shares.
282 d.addCallback(lambda res: n.overwrite("large size file" * 1000))
283 d.addCallback(lambda res: n.download_best_version())
284 d.addCallback(lambda res:
285 self.failUnlessEqual(res, "large size file" * 1000))
287 d.addCallback(_created)
290 def test_create_with_initial_contents(self):
291 d = self.nodemaker.create_mutable_file("contents 1")
293 d = n.download_best_version()
294 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
295 d.addCallback(lambda res: n.overwrite("contents 2"))
296 d.addCallback(lambda res: n.download_best_version())
297 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
299 d.addCallback(_created)
302 def test_create_with_too_large_contents(self):
303 BIG = "a" * (self.OLD_MAX_SEGMENT_SIZE + 1)
304 d = self.nodemaker.create_mutable_file(BIG)
308 d.addCallback(_created)
311 def failUnlessCurrentSeqnumIs(self, n, expected_seqnum, which):
312 d = n.get_servermap(MODE_READ)
313 d.addCallback(lambda servermap: servermap.best_recoverable_version())
314 d.addCallback(lambda verinfo:
315 self.failUnlessEqual(verinfo[0], expected_seqnum, which))
318 def test_modify(self):
319 def _modifier(old_contents, servermap, first_time):
320 return old_contents + "line2"
321 def _non_modifier(old_contents, servermap, first_time):
323 def _none_modifier(old_contents, servermap, first_time):
325 def _error_modifier(old_contents, servermap, first_time):
326 raise ValueError("oops")
327 def _toobig_modifier(old_contents, servermap, first_time):
328 return "b" * (self.OLD_MAX_SEGMENT_SIZE+1)
330 def _ucw_error_modifier(old_contents, servermap, first_time):
331 # simulate an UncoordinatedWriteError once
334 raise UncoordinatedWriteError("simulated")
335 return old_contents + "line3"
336 def _ucw_error_non_modifier(old_contents, servermap, first_time):
337 # simulate an UncoordinatedWriteError once, and don't actually
338 # modify the contents on subsequent invocations
341 raise UncoordinatedWriteError("simulated")
344 d = self.nodemaker.create_mutable_file("line1")
346 d = n.modify(_modifier)
347 d.addCallback(lambda res: n.download_best_version())
348 d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
349 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "m"))
351 d.addCallback(lambda res: n.modify(_non_modifier))
352 d.addCallback(lambda res: n.download_best_version())
353 d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
354 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "non"))
356 d.addCallback(lambda res: n.modify(_none_modifier))
357 d.addCallback(lambda res: n.download_best_version())
358 d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
359 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "none"))
361 d.addCallback(lambda res:
362 self.shouldFail(ValueError, "error_modifier", None,
363 n.modify, _error_modifier))
364 d.addCallback(lambda res: n.download_best_version())
365 d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
366 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "err"))
369 d.addCallback(lambda res: n.download_best_version())
370 d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
371 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "big"))
373 d.addCallback(lambda res: n.modify(_ucw_error_modifier))
374 d.addCallback(lambda res: self.failUnlessEqual(len(calls), 2))
375 d.addCallback(lambda res: n.download_best_version())
376 d.addCallback(lambda res: self.failUnlessEqual(res,
378 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 3, "ucw"))
380 def _reset_ucw_error_modifier(res):
383 d.addCallback(_reset_ucw_error_modifier)
385 # in practice, this n.modify call should publish twice: the first
386 # one gets a UCWE, the second does not. But our test jig (in
387 # which the modifier raises the UCWE) skips over the first one,
388 # so in this test there will be only one publish, and the seqnum
389 # will only be one larger than the previous test, not two (i.e. 4
391 d.addCallback(lambda res: n.modify(_ucw_error_non_modifier))
392 d.addCallback(lambda res: self.failUnlessEqual(len(calls), 2))
393 d.addCallback(lambda res: n.download_best_version())
394 d.addCallback(lambda res: self.failUnlessEqual(res,
396 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 4, "ucw"))
397 d.addCallback(lambda res: n.modify(_toobig_modifier))
399 d.addCallback(_created)
402 def test_modify_backoffer(self):
403 def _modifier(old_contents, servermap, first_time):
404 return old_contents + "line2"
406 def _ucw_error_modifier(old_contents, servermap, first_time):
407 # simulate an UncoordinatedWriteError once
410 raise UncoordinatedWriteError("simulated")
411 return old_contents + "line3"
412 def _always_ucw_error_modifier(old_contents, servermap, first_time):
413 raise UncoordinatedWriteError("simulated")
414 def _backoff_stopper(node, f):
416 def _backoff_pauser(node, f):
418 reactor.callLater(0.5, d.callback, None)
421 # the give-up-er will hit its maximum retry count quickly
422 giveuper = BackoffAgent()
423 giveuper._delay = 0.1
426 d = self.nodemaker.create_mutable_file("line1")
428 d = n.modify(_modifier)
429 d.addCallback(lambda res: n.download_best_version())
430 d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
431 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "m"))
433 d.addCallback(lambda res:
434 self.shouldFail(UncoordinatedWriteError,
435 "_backoff_stopper", None,
436 n.modify, _ucw_error_modifier,
438 d.addCallback(lambda res: n.download_best_version())
439 d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
440 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "stop"))
442 def _reset_ucw_error_modifier(res):
445 d.addCallback(_reset_ucw_error_modifier)
446 d.addCallback(lambda res: n.modify(_ucw_error_modifier,
448 d.addCallback(lambda res: n.download_best_version())
449 d.addCallback(lambda res: self.failUnlessEqual(res,
451 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 3, "pause"))
453 d.addCallback(lambda res:
454 self.shouldFail(UncoordinatedWriteError,
456 n.modify, _always_ucw_error_modifier,
458 d.addCallback(lambda res: n.download_best_version())
459 d.addCallback(lambda res: self.failUnlessEqual(res,
461 d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 3, "giveup"))
464 d.addCallback(_created)
467 def test_upload_and_download_full_size_keys(self):
468 self.nodemaker.key_generator = client.KeyGenerator()
469 d = self.nodemaker.create_mutable_file()
471 d = defer.succeed(None)
472 d.addCallback(lambda res: n.get_servermap(MODE_READ))
473 d.addCallback(lambda smap: smap.dump(StringIO()))
474 d.addCallback(lambda sio:
475 self.failUnless("3-of-10" in sio.getvalue()))
476 d.addCallback(lambda res: n.overwrite("contents 1"))
477 d.addCallback(lambda res: self.failUnlessIdentical(res, None))
478 d.addCallback(lambda res: n.download_best_version())
479 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
480 d.addCallback(lambda res: n.overwrite("contents 2"))
481 d.addCallback(lambda res: n.download_best_version())
482 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
483 d.addCallback(lambda res: n.download(download.Data()))
484 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
485 d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
486 d.addCallback(lambda smap: n.upload("contents 3", smap))
487 d.addCallback(lambda res: n.download_best_version())
488 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
489 d.addCallback(lambda res: n.get_servermap(MODE_ANYTHING))
490 d.addCallback(lambda smap:
491 n.download_version(smap,
492 smap.best_recoverable_version()))
493 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
495 d.addCallback(_created)
499 class MakeShares(unittest.TestCase):
500 def test_encrypt(self):
501 nm = make_nodemaker()
502 CONTENTS = "some initial contents"
503 d = nm.create_mutable_file(CONTENTS)
505 p = Publish(fn, nm.storage_broker, None)
507 p.readkey = "\x00" * 16
509 p.required_shares = 3
511 p.setup_encoding_parameters()
512 return p._encrypt_and_encode()
513 d.addCallback(_created)
514 def _done(shares_and_shareids):
515 (shares, share_ids) = shares_and_shareids
516 self.failUnlessEqual(len(shares), 10)
518 self.failUnless(isinstance(sh, str))
519 self.failUnlessEqual(len(sh), 7)
520 self.failUnlessEqual(len(share_ids), 10)
524 def test_generate(self):
525 nm = make_nodemaker()
526 CONTENTS = "some initial contents"
527 d = nm.create_mutable_file(CONTENTS)
530 p = Publish(fn, nm.storage_broker, None)
533 p.required_shares = 3
535 p.setup_encoding_parameters()
538 # make some fake shares
539 shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
540 p._privkey = fn.get_privkey()
541 p._encprivkey = fn.get_encprivkey()
542 p._pubkey = fn.get_pubkey()
543 return p._generate_shares(shares_and_ids)
544 d.addCallback(_created)
547 final_shares = p.shares
548 root_hash = p.root_hash
549 self.failUnlessEqual(len(root_hash), 32)
550 self.failUnless(isinstance(final_shares, dict))
551 self.failUnlessEqual(len(final_shares), 10)
552 self.failUnlessEqual(sorted(final_shares.keys()), range(10))
553 for i,sh in final_shares.items():
554 self.failUnless(isinstance(sh, str))
555 # feed the share through the unpacker as a sanity-check
556 pieces = unpack_share(sh)
557 (u_seqnum, u_root_hash, IV, k, N, segsize, datalen,
558 pubkey, signature, share_hash_chain, block_hash_tree,
559 share_data, enc_privkey) = pieces
560 self.failUnlessEqual(u_seqnum, 3)
561 self.failUnlessEqual(u_root_hash, root_hash)
562 self.failUnlessEqual(k, 3)
563 self.failUnlessEqual(N, 10)
564 self.failUnlessEqual(segsize, 21)
565 self.failUnlessEqual(datalen, len(CONTENTS))
566 self.failUnlessEqual(pubkey, p._pubkey.serialize())
567 sig_material = struct.pack(">BQ32s16s BBQQ",
568 0, p._new_seqnum, root_hash, IV,
569 k, N, segsize, datalen)
570 self.failUnless(p._pubkey.verify(sig_material, signature))
571 #self.failUnlessEqual(signature, p._privkey.sign(sig_material))
572 self.failUnless(isinstance(share_hash_chain, dict))
573 self.failUnlessEqual(len(share_hash_chain), 4) # ln2(10)++
574 for shnum,share_hash in share_hash_chain.items():
575 self.failUnless(isinstance(shnum, int))
576 self.failUnless(isinstance(share_hash, str))
577 self.failUnlessEqual(len(share_hash), 32)
578 self.failUnless(isinstance(block_hash_tree, list))
579 self.failUnlessEqual(len(block_hash_tree), 1) # very small tree
580 self.failUnlessEqual(IV, "SALT"*4)
581 self.failUnlessEqual(len(share_data), len("%07d" % 1))
582 self.failUnlessEqual(enc_privkey, self._fn.get_encprivkey())
583 d.addCallback(_generated)
586 # TODO: when we publish to 20 peers, we should get one share per peer on 10
587 # when we publish to 3 peers, we should get either 3 or 4 shares per peer
588 # when we publish to zero peers, we should get a NotEnoughSharesError
591 def publish_one(self):
592 # publish a file and create shares, which can then be manipulated
594 self.CONTENTS = "New contents go here" * 1000
596 self._storage = FakeStorage()
597 self._nodemaker = make_nodemaker(self._storage)
598 self._storage_broker = self._nodemaker.storage_broker
599 d = self._nodemaker.create_mutable_file(self.CONTENTS)
602 self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
603 d.addCallback(_created)
606 def publish_multiple(self):
607 self.CONTENTS = ["Contents 0",
612 self._copied_shares = {}
614 self._storage = FakeStorage()
615 self._nodemaker = make_nodemaker(self._storage)
616 d = self._nodemaker.create_mutable_file(self.CONTENTS[0]) # seqnum=1
619 # now create multiple versions of the same file, and accumulate
620 # their shares, so we can mix and match them later.
621 d = defer.succeed(None)
622 d.addCallback(self._copy_shares, 0)
623 d.addCallback(lambda res: node.overwrite(self.CONTENTS[1])) #s2
624 d.addCallback(self._copy_shares, 1)
625 d.addCallback(lambda res: node.overwrite(self.CONTENTS[2])) #s3
626 d.addCallback(self._copy_shares, 2)
627 d.addCallback(lambda res: node.overwrite(self.CONTENTS[3])) #s4a
628 d.addCallback(self._copy_shares, 3)
629 # now we replace all the shares with version s3, and upload a new
630 # version to get s4b.
631 rollback = dict([(i,2) for i in range(10)])
632 d.addCallback(lambda res: self._set_versions(rollback))
633 d.addCallback(lambda res: node.overwrite(self.CONTENTS[4])) #s4b
634 d.addCallback(self._copy_shares, 4)
635 # we leave the storage in state 4
637 d.addCallback(_created)
640 def _copy_shares(self, ignored, index):
641 shares = self._storage._peers
642 # we need a deep copy
644 for peerid in shares:
645 new_shares[peerid] = {}
646 for shnum in shares[peerid]:
647 new_shares[peerid][shnum] = shares[peerid][shnum]
648 self._copied_shares[index] = new_shares
650 def _set_versions(self, versionmap):
651 # versionmap maps shnums to which version (0,1,2,3,4) we want the
652 # share to be at. Any shnum which is left out of the map will stay at
653 # its current version.
654 shares = self._storage._peers
655 oldshares = self._copied_shares
656 for peerid in shares:
657 for shnum in shares[peerid]:
658 if shnum in versionmap:
659 index = versionmap[shnum]
660 shares[peerid][shnum] = oldshares[index][peerid][shnum]
663 class Servermap(unittest.TestCase, PublishMixin):
665 return self.publish_one()
667 def make_servermap(self, mode=MODE_CHECK, fn=None, sb=None):
671 sb = self._storage_broker
672 smu = ServermapUpdater(fn, sb, Monitor(),
677 def update_servermap(self, oldmap, mode=MODE_CHECK):
678 smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(),
683 def failUnlessOneRecoverable(self, sm, num_shares):
684 self.failUnlessEqual(len(sm.recoverable_versions()), 1)
685 self.failUnlessEqual(len(sm.unrecoverable_versions()), 0)
686 best = sm.best_recoverable_version()
687 self.failIfEqual(best, None)
688 self.failUnlessEqual(sm.recoverable_versions(), set([best]))
689 self.failUnlessEqual(len(sm.shares_available()), 1)
690 self.failUnlessEqual(sm.shares_available()[best], (num_shares, 3, 10))
691 shnum, peerids = sm.make_sharemap().items()[0]
692 peerid = list(peerids)[0]
693 self.failUnlessEqual(sm.version_on_peer(peerid, shnum), best)
694 self.failUnlessEqual(sm.version_on_peer(peerid, 666), None)
697 def test_basic(self):
698 d = defer.succeed(None)
699 ms = self.make_servermap
700 us = self.update_servermap
702 d.addCallback(lambda res: ms(mode=MODE_CHECK))
703 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
704 d.addCallback(lambda res: ms(mode=MODE_WRITE))
705 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
706 d.addCallback(lambda res: ms(mode=MODE_READ))
707 # this more stops at k+epsilon, and epsilon=k, so 6 shares
708 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
709 d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
710 # this mode stops at 'k' shares
711 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
713 # and can we re-use the same servermap? Note that these are sorted in
714 # increasing order of number of servers queried, since once a server
715 # gets into the servermap, we'll always ask it for an update.
716 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
717 d.addCallback(lambda sm: us(sm, mode=MODE_READ))
718 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
719 d.addCallback(lambda sm: us(sm, mode=MODE_WRITE))
720 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
721 d.addCallback(lambda sm: us(sm, mode=MODE_CHECK))
722 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
723 d.addCallback(lambda sm: us(sm, mode=MODE_ANYTHING))
724 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
728 def test_fetch_privkey(self):
729 d = defer.succeed(None)
730 # use the sibling filenode (which hasn't been used yet), and make
731 # sure it can fetch the privkey. The file is small, so the privkey
732 # will be fetched on the first (query) pass.
733 d.addCallback(lambda res: self.make_servermap(MODE_WRITE, self._fn2))
734 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
736 # create a new file, which is large enough to knock the privkey out
737 # of the early part of the file
738 LARGE = "These are Larger contents" * 200 # about 5KB
739 d.addCallback(lambda res: self._nodemaker.create_mutable_file(LARGE))
740 def _created(large_fn):
741 large_fn2 = self._nodemaker.create_from_cap(large_fn.get_uri())
742 return self.make_servermap(MODE_WRITE, large_fn2)
743 d.addCallback(_created)
744 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
747 def test_mark_bad(self):
748 d = defer.succeed(None)
749 ms = self.make_servermap
750 us = self.update_servermap
752 d.addCallback(lambda res: ms(mode=MODE_READ))
753 d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
755 v = sm.best_recoverable_version()
756 vm = sm.make_versionmap()
758 self.failUnlessEqual(len(shares), 6)
759 self._corrupted = set()
760 # mark the first 5 shares as corrupt, then update the servermap.
761 # The map should not have the marked shares it in any more, and
762 # new shares should be found to replace the missing ones.
763 for (shnum, peerid, timestamp) in shares:
765 self._corrupted.add( (peerid, shnum) )
766 sm.mark_bad_share(peerid, shnum, "")
767 return self.update_servermap(sm, MODE_WRITE)
768 d.addCallback(_made_map)
770 # this should find all 5 shares that weren't marked bad
771 v = sm.best_recoverable_version()
772 vm = sm.make_versionmap()
774 for (peerid, shnum) in self._corrupted:
775 peer_shares = sm.shares_on_peer(peerid)
776 self.failIf(shnum in peer_shares,
777 "%d was in %s" % (shnum, peer_shares))
778 self.failUnlessEqual(len(shares), 5)
779 d.addCallback(_check_map)
782 def failUnlessNoneRecoverable(self, sm):
783 self.failUnlessEqual(len(sm.recoverable_versions()), 0)
784 self.failUnlessEqual(len(sm.unrecoverable_versions()), 0)
785 best = sm.best_recoverable_version()
786 self.failUnlessEqual(best, None)
787 self.failUnlessEqual(len(sm.shares_available()), 0)
789 def test_no_shares(self):
790 self._storage._peers = {} # delete all shares
791 ms = self.make_servermap
792 d = defer.succeed(None)
794 d.addCallback(lambda res: ms(mode=MODE_CHECK))
795 d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
797 d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
798 d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
800 d.addCallback(lambda res: ms(mode=MODE_WRITE))
801 d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
803 d.addCallback(lambda res: ms(mode=MODE_READ))
804 d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
808 def failUnlessNotQuiteEnough(self, sm):
809 self.failUnlessEqual(len(sm.recoverable_versions()), 0)
810 self.failUnlessEqual(len(sm.unrecoverable_versions()), 1)
811 best = sm.best_recoverable_version()
812 self.failUnlessEqual(best, None)
813 self.failUnlessEqual(len(sm.shares_available()), 1)
814 self.failUnlessEqual(sm.shares_available().values()[0], (2,3,10) )
817 def test_not_quite_enough_shares(self):
819 ms = self.make_servermap
820 num_shares = len(s._peers)
821 for peerid in s._peers:
822 s._peers[peerid] = {}
826 # now there ought to be only two shares left
827 assert len([peerid for peerid in s._peers if s._peers[peerid]]) == 2
829 d = defer.succeed(None)
831 d.addCallback(lambda res: ms(mode=MODE_CHECK))
832 d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
833 d.addCallback(lambda sm:
834 self.failUnlessEqual(len(sm.make_sharemap()), 2))
835 d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
836 d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
837 d.addCallback(lambda res: ms(mode=MODE_WRITE))
838 d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
839 d.addCallback(lambda res: ms(mode=MODE_READ))
840 d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
846 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
848 return self.publish_one()
850 def make_servermap(self, mode=MODE_READ, oldmap=None, sb=None):
854 sb = self._storage_broker
855 smu = ServermapUpdater(self._fn, sb, Monitor(), oldmap, mode)
859 def abbrev_verinfo(self, verinfo):
862 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
863 offsets_tuple) = verinfo
864 return "%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
866 def abbrev_verinfo_dict(self, verinfo_d):
868 for verinfo,value in verinfo_d.items():
869 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
870 offsets_tuple) = verinfo
871 output["%d-%s" % (seqnum, base32.b2a(root_hash)[:4])] = value
874 def dump_servermap(self, servermap):
875 print "SERVERMAP", servermap
876 print "RECOVERABLE", [self.abbrev_verinfo(v)
877 for v in servermap.recoverable_versions()]
878 print "BEST", self.abbrev_verinfo(servermap.best_recoverable_version())
879 print "available", self.abbrev_verinfo_dict(servermap.shares_available())
881 def do_download(self, servermap, version=None):
883 version = servermap.best_recoverable_version()
884 r = Retrieve(self._fn, servermap, version)
887 def test_basic(self):
888 d = self.make_servermap()
889 def _do_retrieve(servermap):
890 self._smap = servermap
891 #self.dump_servermap(servermap)
892 self.failUnlessEqual(len(servermap.recoverable_versions()), 1)
893 return self.do_download(servermap)
894 d.addCallback(_do_retrieve)
895 def _retrieved(new_contents):
896 self.failUnlessEqual(new_contents, self.CONTENTS)
897 d.addCallback(_retrieved)
898 # we should be able to re-use the same servermap, both with and
899 # without updating it.
900 d.addCallback(lambda res: self.do_download(self._smap))
901 d.addCallback(_retrieved)
902 d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
903 d.addCallback(lambda res: self.do_download(self._smap))
904 d.addCallback(_retrieved)
905 # clobbering the pubkey should make the servermap updater re-fetch it
906 def _clobber_pubkey(res):
907 self._fn._pubkey = None
908 d.addCallback(_clobber_pubkey)
909 d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
910 d.addCallback(lambda res: self.do_download(self._smap))
911 d.addCallback(_retrieved)
914 def test_all_shares_vanished(self):
915 d = self.make_servermap()
916 def _remove_shares(servermap):
917 for shares in self._storage._peers.values():
919 d1 = self.shouldFail(NotEnoughSharesError,
920 "test_all_shares_vanished",
922 self.do_download, servermap)
924 d.addCallback(_remove_shares)
927 def test_no_servers(self):
928 sb2 = make_storagebroker(num_peers=0)
929 # if there are no servers, then a MODE_READ servermap should come
931 d = self.make_servermap(sb=sb2)
932 def _check_servermap(servermap):
933 self.failUnlessEqual(servermap.best_recoverable_version(), None)
934 self.failIf(servermap.recoverable_versions())
935 self.failIf(servermap.unrecoverable_versions())
936 self.failIf(servermap.all_peers())
937 d.addCallback(_check_servermap)
939 test_no_servers.timeout = 15
941 def test_no_servers_download(self):
942 sb2 = make_storagebroker(num_peers=0)
943 self._fn._storage_broker = sb2
944 d = self.shouldFail(UnrecoverableFileError,
945 "test_no_servers_download",
946 "no recoverable versions",
947 self._fn.download_best_version)
949 # a failed download that occurs while we aren't connected to
950 # anybody should not prevent a subsequent download from working.
951 # This isn't quite the webapi-driven test that #463 wants, but it
952 # should be close enough.
953 self._fn._storage_broker = self._storage_broker
954 return self._fn.download_best_version()
955 def _retrieved(new_contents):
956 self.failUnlessEqual(new_contents, self.CONTENTS)
957 d.addCallback(_restore)
958 d.addCallback(_retrieved)
960 test_no_servers_download.timeout = 15
962 def _test_corrupt_all(self, offset, substring,
963 should_succeed=False, corrupt_early=True,
964 failure_checker=None):
965 d = defer.succeed(None)
967 d.addCallback(corrupt, self._storage, offset)
968 d.addCallback(lambda res: self.make_servermap())
969 if not corrupt_early:
970 d.addCallback(corrupt, self._storage, offset)
971 def _do_retrieve(servermap):
972 ver = servermap.best_recoverable_version()
973 if ver is None and not should_succeed:
974 # no recoverable versions == not succeeding. The problem
975 # should be noted in the servermap's list of problems.
977 allproblems = [str(f) for f in servermap.problems]
978 self.failUnlessIn(substring, "".join(allproblems))
981 d1 = self._fn.download_version(servermap, ver)
982 d1.addCallback(lambda new_contents:
983 self.failUnlessEqual(new_contents, self.CONTENTS))
985 d1 = self.shouldFail(NotEnoughSharesError,
986 "_corrupt_all(offset=%s)" % (offset,),
988 self._fn.download_version, servermap, ver)
990 d1.addCallback(failure_checker)
991 d1.addCallback(lambda res: servermap)
993 d.addCallback(_do_retrieve)
996 def test_corrupt_all_verbyte(self):
997 # when the version byte is not 0, we hit an UnknownVersionError error
999 d = self._test_corrupt_all(0, "UnknownVersionError")
1000 def _check_servermap(servermap):
1001 # and the dump should mention the problems
1003 dump = servermap.dump(s).getvalue()
1004 self.failUnless("10 PROBLEMS" in dump, dump)
1005 d.addCallback(_check_servermap)
1008 def test_corrupt_all_seqnum(self):
1009 # a corrupt sequence number will trigger a bad signature
1010 return self._test_corrupt_all(1, "signature is invalid")
1012 def test_corrupt_all_R(self):
1013 # a corrupt root hash will trigger a bad signature
1014 return self._test_corrupt_all(9, "signature is invalid")
1016 def test_corrupt_all_IV(self):
1017 # a corrupt salt/IV will trigger a bad signature
1018 return self._test_corrupt_all(41, "signature is invalid")
1020 def test_corrupt_all_k(self):
1021 # a corrupt 'k' will trigger a bad signature
1022 return self._test_corrupt_all(57, "signature is invalid")
1024 def test_corrupt_all_N(self):
1025 # a corrupt 'N' will trigger a bad signature
1026 return self._test_corrupt_all(58, "signature is invalid")
1028 def test_corrupt_all_segsize(self):
1029 # a corrupt segsize will trigger a bad signature
1030 return self._test_corrupt_all(59, "signature is invalid")
1032 def test_corrupt_all_datalen(self):
1033 # a corrupt data length will trigger a bad signature
1034 return self._test_corrupt_all(67, "signature is invalid")
1036 def test_corrupt_all_pubkey(self):
1037 # a corrupt pubkey won't match the URI's fingerprint. We need to
1038 # remove the pubkey from the filenode, or else it won't bother trying
1040 self._fn._pubkey = None
1041 return self._test_corrupt_all("pubkey",
1042 "pubkey doesn't match fingerprint")
1044 def test_corrupt_all_sig(self):
1045 # a corrupt signature is a bad one
1046 # the signature runs from about [543:799], depending upon the length
1048 return self._test_corrupt_all("signature", "signature is invalid")
1050 def test_corrupt_all_share_hash_chain_number(self):
1051 # a corrupt share hash chain entry will show up as a bad hash. If we
1052 # mangle the first byte, that will look like a bad hash number,
1053 # causing an IndexError
1054 return self._test_corrupt_all("share_hash_chain", "corrupt hashes")
1056 def test_corrupt_all_share_hash_chain_hash(self):
1057 # a corrupt share hash chain entry will show up as a bad hash. If we
1058 # mangle a few bytes in, that will look like a bad hash.
1059 return self._test_corrupt_all(("share_hash_chain",4), "corrupt hashes")
1061 def test_corrupt_all_block_hash_tree(self):
1062 return self._test_corrupt_all("block_hash_tree",
1063 "block hash tree failure")
1065 def test_corrupt_all_block(self):
1066 return self._test_corrupt_all("share_data", "block hash tree failure")
1068 def test_corrupt_all_encprivkey(self):
1069 # a corrupted privkey won't even be noticed by the reader, only by a
1071 return self._test_corrupt_all("enc_privkey", None, should_succeed=True)
1074 def test_corrupt_all_seqnum_late(self):
1075 # corrupting the seqnum between mapupdate and retrieve should result
1076 # in NotEnoughSharesError, since each share will look invalid
1079 self.failUnless(f.check(NotEnoughSharesError))
1080 self.failUnless("someone wrote to the data since we read the servermap" in str(f))
1081 return self._test_corrupt_all(1, "ran out of peers",
1082 corrupt_early=False,
1083 failure_checker=_check)
1085 def test_corrupt_all_block_hash_tree_late(self):
1088 self.failUnless(f.check(NotEnoughSharesError))
1089 return self._test_corrupt_all("block_hash_tree",
1090 "block hash tree failure",
1091 corrupt_early=False,
1092 failure_checker=_check)
1095 def test_corrupt_all_block_late(self):
1098 self.failUnless(f.check(NotEnoughSharesError))
1099 return self._test_corrupt_all("share_data", "block hash tree failure",
1100 corrupt_early=False,
1101 failure_checker=_check)
1104 def test_basic_pubkey_at_end(self):
1105 # we corrupt the pubkey in all but the last 'k' shares, allowing the
1106 # download to succeed but forcing a bunch of retries first. Note that
1107 # this is rather pessimistic: our Retrieve process will throw away
1108 # the whole share if the pubkey is bad, even though the rest of the
1109 # share might be good.
1111 self._fn._pubkey = None
1112 k = self._fn.get_required_shares()
1113 N = self._fn.get_total_shares()
1114 d = defer.succeed(None)
1115 d.addCallback(corrupt, self._storage, "pubkey",
1116 shnums_to_corrupt=range(0, N-k))
1117 d.addCallback(lambda res: self.make_servermap())
1118 def _do_retrieve(servermap):
1119 self.failUnless(servermap.problems)
1120 self.failUnless("pubkey doesn't match fingerprint"
1121 in str(servermap.problems[0]))
1122 ver = servermap.best_recoverable_version()
1123 r = Retrieve(self._fn, servermap, ver)
1125 d.addCallback(_do_retrieve)
1126 d.addCallback(lambda new_contents:
1127 self.failUnlessEqual(new_contents, self.CONTENTS))
1130 def test_corrupt_some(self):
1131 # corrupt the data of first five shares (so the servermap thinks
1132 # they're good but retrieve marks them as bad), so that the
1133 # MODE_READ set of 6 will be insufficient, forcing node.download to
1134 # retry with more servers.
1135 corrupt(None, self._storage, "share_data", range(5))
1136 d = self.make_servermap()
1137 def _do_retrieve(servermap):
1138 ver = servermap.best_recoverable_version()
1139 self.failUnless(ver)
1140 return self._fn.download_best_version()
1141 d.addCallback(_do_retrieve)
1142 d.addCallback(lambda new_contents:
1143 self.failUnlessEqual(new_contents, self.CONTENTS))
1146 def test_download_fails(self):
1147 corrupt(None, self._storage, "signature")
1148 d = self.shouldFail(UnrecoverableFileError, "test_download_anyway",
1149 "no recoverable versions",
1150 self._fn.download_best_version)
1155 def check_good(self, r, where):
1156 self.failUnless(r.is_healthy(), where)
1159 def check_bad(self, r, where):
1160 self.failIf(r.is_healthy(), where)
1163 def check_expected_failure(self, r, expected_exception, substring, where):
1164 for (peerid, storage_index, shnum, f) in r.problems:
1165 if f.check(expected_exception):
1166 self.failUnless(substring in str(f),
1167 "%s: substring '%s' not in '%s'" %
1168 (where, substring, str(f)))
1170 self.fail("%s: didn't see expected exception %s in problems %s" %
1171 (where, expected_exception, r.problems))
1174 class Checker(unittest.TestCase, CheckerMixin, PublishMixin):
1176 return self.publish_one()
1179 def test_check_good(self):
1180 d = self._fn.check(Monitor())
1181 d.addCallback(self.check_good, "test_check_good")
1184 def test_check_no_shares(self):
1185 for shares in self._storage._peers.values():
1187 d = self._fn.check(Monitor())
1188 d.addCallback(self.check_bad, "test_check_no_shares")
1191 def test_check_not_enough_shares(self):
1192 for shares in self._storage._peers.values():
1193 for shnum in shares.keys():
1196 d = self._fn.check(Monitor())
1197 d.addCallback(self.check_bad, "test_check_not_enough_shares")
1200 def test_check_all_bad_sig(self):
1201 corrupt(None, self._storage, 1) # bad sig
1202 d = self._fn.check(Monitor())
1203 d.addCallback(self.check_bad, "test_check_all_bad_sig")
1206 def test_check_all_bad_blocks(self):
1207 corrupt(None, self._storage, "share_data", [9]) # bad blocks
1208 # the Checker won't notice this.. it doesn't look at actual data
1209 d = self._fn.check(Monitor())
1210 d.addCallback(self.check_good, "test_check_all_bad_blocks")
1213 def test_verify_good(self):
1214 d = self._fn.check(Monitor(), verify=True)
1215 d.addCallback(self.check_good, "test_verify_good")
1218 def test_verify_all_bad_sig(self):
1219 corrupt(None, self._storage, 1) # bad sig
1220 d = self._fn.check(Monitor(), verify=True)
1221 d.addCallback(self.check_bad, "test_verify_all_bad_sig")
1224 def test_verify_one_bad_sig(self):
1225 corrupt(None, self._storage, 1, [9]) # bad sig
1226 d = self._fn.check(Monitor(), verify=True)
1227 d.addCallback(self.check_bad, "test_verify_one_bad_sig")
1230 def test_verify_one_bad_block(self):
1231 corrupt(None, self._storage, "share_data", [9]) # bad blocks
1232 # the Verifier *will* notice this, since it examines every byte
1233 d = self._fn.check(Monitor(), verify=True)
1234 d.addCallback(self.check_bad, "test_verify_one_bad_block")
1235 d.addCallback(self.check_expected_failure,
1236 CorruptShareError, "block hash tree failure",
1237 "test_verify_one_bad_block")
1240 def test_verify_one_bad_sharehash(self):
1241 corrupt(None, self._storage, "share_hash_chain", [9], 5)
1242 d = self._fn.check(Monitor(), verify=True)
1243 d.addCallback(self.check_bad, "test_verify_one_bad_sharehash")
1244 d.addCallback(self.check_expected_failure,
1245 CorruptShareError, "corrupt hashes",
1246 "test_verify_one_bad_sharehash")
1249 def test_verify_one_bad_encprivkey(self):
1250 corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
1251 d = self._fn.check(Monitor(), verify=True)
1252 d.addCallback(self.check_bad, "test_verify_one_bad_encprivkey")
1253 d.addCallback(self.check_expected_failure,
1254 CorruptShareError, "invalid privkey",
1255 "test_verify_one_bad_encprivkey")
1258 def test_verify_one_bad_encprivkey_uncheckable(self):
1259 corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
1260 readonly_fn = self._fn.get_readonly()
1261 # a read-only node has no way to validate the privkey
1262 d = readonly_fn.check(Monitor(), verify=True)
1263 d.addCallback(self.check_good,
1264 "test_verify_one_bad_encprivkey_uncheckable")
1267 class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin):
1269 def get_shares(self, s):
1270 all_shares = {} # maps (peerid, shnum) to share data
1271 for peerid in s._peers:
1272 shares = s._peers[peerid]
1273 for shnum in shares:
1274 data = shares[shnum]
1275 all_shares[ (peerid, shnum) ] = data
1278 def copy_shares(self, ignored=None):
1279 self.old_shares.append(self.get_shares(self._storage))
1281 def test_repair_nop(self):
1282 self.old_shares = []
1283 d = self.publish_one()
1284 d.addCallback(self.copy_shares)
1285 d.addCallback(lambda res: self._fn.check(Monitor()))
1286 d.addCallback(lambda check_results: self._fn.repair(check_results))
1287 def _check_results(rres):
1288 self.failUnless(IRepairResults.providedBy(rres))
1289 # TODO: examine results
1293 initial_shares = self.old_shares[0]
1294 new_shares = self.old_shares[1]
1295 # TODO: this really shouldn't change anything. When we implement
1296 # a "minimal-bandwidth" repairer", change this test to assert:
1297 #self.failUnlessEqual(new_shares, initial_shares)
1299 # all shares should be in the same place as before
1300 self.failUnlessEqual(set(initial_shares.keys()),
1301 set(new_shares.keys()))
1302 # but they should all be at a newer seqnum. The IV will be
1303 # different, so the roothash will be too.
1304 for key in initial_shares:
1309 k0, N0, segsize0, datalen0,
1310 o0) = unpack_header(initial_shares[key])
1315 k1, N1, segsize1, datalen1,
1316 o1) = unpack_header(new_shares[key])
1317 self.failUnlessEqual(version0, version1)
1318 self.failUnlessEqual(seqnum0+1, seqnum1)
1319 self.failUnlessEqual(k0, k1)
1320 self.failUnlessEqual(N0, N1)
1321 self.failUnlessEqual(segsize0, segsize1)
1322 self.failUnlessEqual(datalen0, datalen1)
1323 d.addCallback(_check_results)
1326 def failIfSharesChanged(self, ignored=None):
1327 old_shares = self.old_shares[-2]
1328 current_shares = self.old_shares[-1]
1329 self.failUnlessEqual(old_shares, current_shares)
1331 def test_merge(self):
1332 self.old_shares = []
1333 d = self.publish_multiple()
1334 # repair will refuse to merge multiple highest seqnums unless you
1336 d.addCallback(lambda res:
1337 self._set_versions({0:3,2:3,4:3,6:3,8:3,
1338 1:4,3:4,5:4,7:4,9:4}))
1339 d.addCallback(self.copy_shares)
1340 d.addCallback(lambda res: self._fn.check(Monitor()))
1341 def _try_repair(check_results):
1342 ex = "There were multiple recoverable versions with identical seqnums, so force=True must be passed to the repair() operation"
1343 d2 = self.shouldFail(MustForceRepairError, "test_merge", ex,
1344 self._fn.repair, check_results)
1345 d2.addCallback(self.copy_shares)
1346 d2.addCallback(self.failIfSharesChanged)
1347 d2.addCallback(lambda res: check_results)
1349 d.addCallback(_try_repair)
1350 d.addCallback(lambda check_results:
1351 self._fn.repair(check_results, force=True))
1352 # this should give us 10 shares of the highest roothash
1353 def _check_repair_results(rres):
1355 d.addCallback(_check_repair_results)
1356 d.addCallback(lambda res: self._fn.get_servermap(MODE_CHECK))
1357 def _check_smap(smap):
1358 self.failUnlessEqual(len(smap.recoverable_versions()), 1)
1359 self.failIf(smap.unrecoverable_versions())
1360 # now, which should have won?
1361 roothash_s4a = self.get_roothash_for(3)
1362 roothash_s4b = self.get_roothash_for(4)
1363 if roothash_s4b > roothash_s4a:
1364 expected_contents = self.CONTENTS[4]
1366 expected_contents = self.CONTENTS[3]
1367 new_versionid = smap.best_recoverable_version()
1368 self.failUnlessEqual(new_versionid[0], 5) # seqnum 5
1369 d2 = self._fn.download_version(smap, new_versionid)
1370 d2.addCallback(self.failUnlessEqual, expected_contents)
1372 d.addCallback(_check_smap)
1375 def test_non_merge(self):
1376 self.old_shares = []
1377 d = self.publish_multiple()
1378 # repair should not refuse a repair that doesn't need to merge. In
1379 # this case, we combine v2 with v3. The repair should ignore v2 and
1380 # copy v3 into a new v5.
1381 d.addCallback(lambda res:
1382 self._set_versions({0:2,2:2,4:2,6:2,8:2,
1383 1:3,3:3,5:3,7:3,9:3}))
1384 d.addCallback(lambda res: self._fn.check(Monitor()))
1385 d.addCallback(lambda check_results: self._fn.repair(check_results))
1386 # this should give us 10 shares of v3
1387 def _check_repair_results(rres):
1389 d.addCallback(_check_repair_results)
1390 d.addCallback(lambda res: self._fn.get_servermap(MODE_CHECK))
1391 def _check_smap(smap):
1392 self.failUnlessEqual(len(smap.recoverable_versions()), 1)
1393 self.failIf(smap.unrecoverable_versions())
1394 # now, which should have won?
1395 roothash_s4a = self.get_roothash_for(3)
1396 expected_contents = self.CONTENTS[3]
1397 new_versionid = smap.best_recoverable_version()
1398 self.failUnlessEqual(new_versionid[0], 5) # seqnum 5
1399 d2 = self._fn.download_version(smap, new_versionid)
1400 d2.addCallback(self.failUnlessEqual, expected_contents)
1402 d.addCallback(_check_smap)
1405 def get_roothash_for(self, index):
1406 # return the roothash for the first share we see in the saved set
1407 shares = self._copied_shares[index]
1408 for peerid in shares:
1409 for shnum in shares[peerid]:
1410 share = shares[peerid][shnum]
1411 (version, seqnum, root_hash, IV, k, N, segsize, datalen, o) = \
1412 unpack_header(share)
1415 def test_check_and_repair_readcap(self):
1416 # we can't currently repair from a mutable readcap: #625
1417 self.old_shares = []
1418 d = self.publish_one()
1419 d.addCallback(self.copy_shares)
1420 def _get_readcap(res):
1421 self._fn3 = self._fn.get_readonly()
1422 # also delete some shares
1423 for peerid,shares in self._storage._peers.items():
1425 d.addCallback(_get_readcap)
1426 d.addCallback(lambda res: self._fn3.check_and_repair(Monitor()))
1427 def _check_results(crr):
1428 self.failUnless(ICheckAndRepairResults.providedBy(crr))
1429 # we should detect the unhealthy, but skip over mutable-readcap
1430 # repairs until #625 is fixed
1431 self.failIf(crr.get_pre_repair_results().is_healthy())
1432 self.failIf(crr.get_repair_attempted())
1433 self.failIf(crr.get_post_repair_results().is_healthy())
1434 d.addCallback(_check_results)
1437 class DevNullDictionary(dict):
1438 def __setitem__(self, key, value):
1441 class MultipleEncodings(unittest.TestCase):
1443 self.CONTENTS = "New contents go here"
1444 self._storage = FakeStorage()
1445 self._nodemaker = make_nodemaker(self._storage, num_peers=20)
1446 self._storage_broker = self._nodemaker.storage_broker
1447 d = self._nodemaker.create_mutable_file(self.CONTENTS)
1450 d.addCallback(_created)
1453 def _encode(self, k, n, data):
1454 # encode 'data' into a peerid->shares dict.
1457 # disable the nodecache, since for these tests we explicitly need
1458 # multiple nodes pointing at the same file
1459 self._nodemaker._node_cache = DevNullDictionary()
1460 fn2 = self._nodemaker.create_from_cap(fn.get_uri())
1461 # then we copy over other fields that are normally fetched from the
1463 fn2._pubkey = fn._pubkey
1464 fn2._privkey = fn._privkey
1465 fn2._encprivkey = fn._encprivkey
1466 # and set the encoding parameters to something completely different
1467 fn2._required_shares = k
1468 fn2._total_shares = n
1471 s._peers = {} # clear existing storage
1472 p2 = Publish(fn2, self._storage_broker, None)
1473 d = p2.publish(data)
1474 def _published(res):
1478 d.addCallback(_published)
1481 def make_servermap(self, mode=MODE_READ, oldmap=None):
1483 oldmap = ServerMap()
1484 smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(),
1489 def test_multiple_encodings(self):
1490 # we encode the same file in two different ways (3-of-10 and 4-of-9),
1491 # then mix up the shares, to make sure that download survives seeing
1492 # a variety of encodings. This is actually kind of tricky to set up.
1494 contents1 = "Contents for encoding 1 (3-of-10) go here"
1495 contents2 = "Contents for encoding 2 (4-of-9) go here"
1496 contents3 = "Contents for encoding 3 (4-of-7) go here"
1498 # we make a retrieval object that doesn't know what encoding
1500 fn3 = self._nodemaker.create_from_cap(self._fn.get_uri())
1502 # now we upload a file through fn1, and grab its shares
1503 d = self._encode(3, 10, contents1)
1504 def _encoded_1(shares):
1505 self._shares1 = shares
1506 d.addCallback(_encoded_1)
1507 d.addCallback(lambda res: self._encode(4, 9, contents2))
1508 def _encoded_2(shares):
1509 self._shares2 = shares
1510 d.addCallback(_encoded_2)
1511 d.addCallback(lambda res: self._encode(4, 7, contents3))
1512 def _encoded_3(shares):
1513 self._shares3 = shares
1514 d.addCallback(_encoded_3)
1517 log.msg("merging sharelists")
1518 # we merge the shares from the two sets, leaving each shnum in
1519 # its original location, but using a share from set1 or set2
1520 # according to the following sequence:
1531 # so that neither form can be recovered until fetch [f], at which
1532 # point version-s1 (the 3-of-10 form) should be recoverable. If
1533 # the implementation latches on to the first version it sees,
1534 # then s2 will be recoverable at fetch [g].
1536 # Later, when we implement code that handles multiple versions,
1537 # we can use this framework to assert that all recoverable
1538 # versions are retrieved, and test that 'epsilon' does its job
1540 places = [2, 2, 3, 2, 1, 1, 1, 2]
1543 sb = self._storage_broker
1545 for peerid in sorted(sb.get_all_serverids()):
1546 peerid_s = shortnodeid_b2a(peerid)
1547 for shnum in self._shares1.get(peerid, {}):
1548 if shnum < len(places):
1549 which = places[shnum]
1552 self._storage._peers[peerid] = peers = {}
1553 in_1 = shnum in self._shares1[peerid]
1554 in_2 = shnum in self._shares2.get(peerid, {})
1555 in_3 = shnum in self._shares3.get(peerid, {})
1556 #print peerid_s, shnum, which, in_1, in_2, in_3
1559 peers[shnum] = self._shares1[peerid][shnum]
1560 sharemap[shnum] = peerid
1563 peers[shnum] = self._shares2[peerid][shnum]
1564 sharemap[shnum] = peerid
1567 peers[shnum] = self._shares3[peerid][shnum]
1568 sharemap[shnum] = peerid
1570 # we don't bother placing any other shares
1571 # now sort the sequence so that share 0 is returned first
1572 new_sequence = [sharemap[shnum]
1573 for shnum in sorted(sharemap.keys())]
1574 self._storage._sequence = new_sequence
1575 log.msg("merge done")
1576 d.addCallback(_merge)
1577 d.addCallback(lambda res: fn3.download_best_version())
1578 def _retrieved(new_contents):
1579 # the current specified behavior is "first version recoverable"
1580 self.failUnlessEqual(new_contents, contents1)
1581 d.addCallback(_retrieved)
1585 class MultipleVersions(unittest.TestCase, PublishMixin, CheckerMixin):
1588 return self.publish_multiple()
1590 def test_multiple_versions(self):
1591 # if we see a mix of versions in the grid, download_best_version
1592 # should get the latest one
1593 self._set_versions(dict([(i,2) for i in (0,2,4,6,8)]))
1594 d = self._fn.download_best_version()
1595 d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[4]))
1596 # and the checker should report problems
1597 d.addCallback(lambda res: self._fn.check(Monitor()))
1598 d.addCallback(self.check_bad, "test_multiple_versions")
1600 # but if everything is at version 2, that's what we should download
1601 d.addCallback(lambda res:
1602 self._set_versions(dict([(i,2) for i in range(10)])))
1603 d.addCallback(lambda res: self._fn.download_best_version())
1604 d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2]))
1605 # if exactly one share is at version 3, we should still get v2
1606 d.addCallback(lambda res:
1607 self._set_versions({0:3}))
1608 d.addCallback(lambda res: self._fn.download_best_version())
1609 d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2]))
1610 # but the servermap should see the unrecoverable version. This
1611 # depends upon the single newer share being queried early.
1612 d.addCallback(lambda res: self._fn.get_servermap(MODE_READ))
1613 def _check_smap(smap):
1614 self.failUnlessEqual(len(smap.unrecoverable_versions()), 1)
1615 newer = smap.unrecoverable_newer_versions()
1616 self.failUnlessEqual(len(newer), 1)
1617 verinfo, health = newer.items()[0]
1618 self.failUnlessEqual(verinfo[0], 4)
1619 self.failUnlessEqual(health, (1,3))
1620 self.failIf(smap.needs_merge())
1621 d.addCallback(_check_smap)
1622 # if we have a mix of two parallel versions (s4a and s4b), we could
1624 d.addCallback(lambda res:
1625 self._set_versions({0:3,2:3,4:3,6:3,8:3,
1626 1:4,3:4,5:4,7:4,9:4}))
1627 d.addCallback(lambda res: self._fn.get_servermap(MODE_READ))
1628 def _check_smap_mixed(smap):
1629 self.failUnlessEqual(len(smap.unrecoverable_versions()), 0)
1630 newer = smap.unrecoverable_newer_versions()
1631 self.failUnlessEqual(len(newer), 0)
1632 self.failUnless(smap.needs_merge())
1633 d.addCallback(_check_smap_mixed)
1634 d.addCallback(lambda res: self._fn.download_best_version())
1635 d.addCallback(lambda res: self.failUnless(res == self.CONTENTS[3] or
1636 res == self.CONTENTS[4]))
1639 def test_replace(self):
1640 # if we see a mix of versions in the grid, we should be able to
1641 # replace them all with a newer version
1643 # if exactly one share is at version 3, we should download (and
1644 # replace) v2, and the result should be v4. Note that the index we
1645 # give to _set_versions is different than the sequence number.
1646 target = dict([(i,2) for i in range(10)]) # seqnum3
1647 target[0] = 3 # seqnum4
1648 self._set_versions(target)
1650 def _modify(oldversion, servermap, first_time):
1651 return oldversion + " modified"
1652 d = self._fn.modify(_modify)
1653 d.addCallback(lambda res: self._fn.download_best_version())
1654 expected = self.CONTENTS[2] + " modified"
1655 d.addCallback(lambda res: self.failUnlessEqual(res, expected))
1656 # and the servermap should indicate that the outlier was replaced too
1657 d.addCallback(lambda res: self._fn.get_servermap(MODE_CHECK))
1658 def _check_smap(smap):
1659 self.failUnlessEqual(smap.highest_seqnum(), 5)
1660 self.failUnlessEqual(len(smap.unrecoverable_versions()), 0)
1661 self.failUnlessEqual(len(smap.recoverable_versions()), 1)
1662 d.addCallback(_check_smap)
1666 class Utils(unittest.TestCase):
1667 def _do_inside(self, c, x_start, x_length, y_start, y_length):
1668 # we compare this against sets of integers
1669 x = set(range(x_start, x_start+x_length))
1670 y = set(range(y_start, y_start+y_length))
1671 should_be_inside = x.issubset(y)
1672 self.failUnlessEqual(should_be_inside, c._inside(x_start, x_length,
1674 str((x_start, x_length, y_start, y_length)))
1676 def test_cache_inside(self):
1680 for y_start in range(8, 17):
1681 for y_length in range(8):
1682 self._do_inside(c, x_start, x_length, y_start, y_length)
1684 def _do_overlap(self, c, x_start, x_length, y_start, y_length):
1685 # we compare this against sets of integers
1686 x = set(range(x_start, x_start+x_length))
1687 y = set(range(y_start, y_start+y_length))
1688 overlap = bool(x.intersection(y))
1689 self.failUnlessEqual(overlap, c._does_overlap(x_start, x_length,
1691 str((x_start, x_length, y_start, y_length)))
1693 def test_cache_overlap(self):
1697 for y_start in range(8, 17):
1698 for y_length in range(8):
1699 self._do_overlap(c, x_start, x_length, y_start, y_length)
1701 def test_cache(self):
1703 # xdata = base62.b2a(os.urandom(100))[:100]
1704 xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
1705 ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
1707 c.add("v1", 1, 0, xdata, "time0")
1708 c.add("v1", 1, 2000, ydata, "time1")
1709 self.failUnlessEqual(c.read("v2", 1, 10, 11), nope)
1710 self.failUnlessEqual(c.read("v1", 2, 10, 11), nope)
1711 self.failUnlessEqual(c.read("v1", 1, 0, 10), (xdata[:10], "time0"))
1712 self.failUnlessEqual(c.read("v1", 1, 90, 10), (xdata[90:], "time0"))
1713 self.failUnlessEqual(c.read("v1", 1, 300, 10), nope)
1714 self.failUnlessEqual(c.read("v1", 1, 2050, 5), (ydata[50:55], "time1"))
1715 self.failUnlessEqual(c.read("v1", 1, 0, 101), nope)
1716 self.failUnlessEqual(c.read("v1", 1, 99, 1), (xdata[99:100], "time0"))
1717 self.failUnlessEqual(c.read("v1", 1, 100, 1), nope)
1718 self.failUnlessEqual(c.read("v1", 1, 1990, 9), nope)
1719 self.failUnlessEqual(c.read("v1", 1, 1990, 10), nope)
1720 self.failUnlessEqual(c.read("v1", 1, 1990, 11), nope)
1721 self.failUnlessEqual(c.read("v1", 1, 1990, 15), nope)
1722 self.failUnlessEqual(c.read("v1", 1, 1990, 19), nope)
1723 self.failUnlessEqual(c.read("v1", 1, 1990, 20), nope)
1724 self.failUnlessEqual(c.read("v1", 1, 1990, 21), nope)
1725 self.failUnlessEqual(c.read("v1", 1, 1990, 25), nope)
1726 self.failUnlessEqual(c.read("v1", 1, 1999, 25), nope)
1728 # optional: join fragments
1730 c.add("v1", 1, 0, xdata[:10], "time0")
1731 c.add("v1", 1, 10, xdata[10:20], "time1")
1732 #self.failUnlessEqual(c.read("v1", 1, 0, 20), (xdata[:20], "time0"))
1734 class Exceptions(unittest.TestCase):
1735 def test_repr(self):
1736 nmde = NeedMoreDataError(100, 50, 100)
1737 self.failUnless("NeedMoreDataError" in repr(nmde), repr(nmde))
1738 ucwe = UncoordinatedWriteError()
1739 self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
1741 class SameKeyGenerator:
1742 def __init__(self, pubkey, privkey):
1743 self.pubkey = pubkey
1744 self.privkey = privkey
1745 def generate(self, keysize=None):
1746 return defer.succeed( (self.pubkey, self.privkey) )
1748 class FirstServerGetsKilled:
1750 def notify(self, retval, wrapper, methname):
1752 wrapper.broken = True
1756 class FirstServerGetsDeleted:
1759 self.silenced = None
1760 def notify(self, retval, wrapper, methname):
1762 # this query will work, but later queries should think the share
1765 self.silenced = wrapper
1767 if wrapper == self.silenced:
1768 assert methname == "slot_testv_and_readv_and_writev"
1772 class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
1773 def test_publish_surprise(self):
1774 self.basedir = "mutable/Problems/test_publish_surprise"
1776 nm = self.g.clients[0].nodemaker
1777 d = nm.create_mutable_file("contents 1")
1779 d = defer.succeed(None)
1780 d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
1781 def _got_smap1(smap):
1782 # stash the old state of the file
1784 d.addCallback(_got_smap1)
1785 # then modify the file, leaving the old map untouched
1786 d.addCallback(lambda res: log.msg("starting winning write"))
1787 d.addCallback(lambda res: n.overwrite("contents 2"))
1788 # now attempt to modify the file with the old servermap. This
1789 # will look just like an uncoordinated write, in which every
1790 # single share got updated between our mapupdate and our publish
1791 d.addCallback(lambda res: log.msg("starting doomed write"))
1792 d.addCallback(lambda res:
1793 self.shouldFail(UncoordinatedWriteError,
1794 "test_publish_surprise", None,
1796 "contents 2a", self.old_map))
1798 d.addCallback(_created)
1801 def test_retrieve_surprise(self):
1802 self.basedir = "mutable/Problems/test_retrieve_surprise"
1804 nm = self.g.clients[0].nodemaker
1805 d = nm.create_mutable_file("contents 1")
1807 d = defer.succeed(None)
1808 d.addCallback(lambda res: n.get_servermap(MODE_READ))
1809 def _got_smap1(smap):
1810 # stash the old state of the file
1812 d.addCallback(_got_smap1)
1813 # then modify the file, leaving the old map untouched
1814 d.addCallback(lambda res: log.msg("starting winning write"))
1815 d.addCallback(lambda res: n.overwrite("contents 2"))
1816 # now attempt to retrieve the old version with the old servermap.
1817 # This will look like someone has changed the file since we
1818 # updated the servermap.
1819 d.addCallback(lambda res: n._cache._clear())
1820 d.addCallback(lambda res: log.msg("starting doomed read"))
1821 d.addCallback(lambda res:
1822 self.shouldFail(NotEnoughSharesError,
1823 "test_retrieve_surprise",
1824 "ran out of peers: have 0 shares (k=3)",
1827 self.old_map.best_recoverable_version(),
1830 d.addCallback(_created)
1833 def test_unexpected_shares(self):
1834 # upload the file, take a servermap, shut down one of the servers,
1835 # upload it again (causing shares to appear on a new server), then
1836 # upload using the old servermap. The last upload should fail with an
1837 # UncoordinatedWriteError, because of the shares that didn't appear
1839 self.basedir = "mutable/Problems/test_unexpected_shares"
1841 nm = self.g.clients[0].nodemaker
1842 d = nm.create_mutable_file("contents 1")
1844 d = defer.succeed(None)
1845 d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
1846 def _got_smap1(smap):
1847 # stash the old state of the file
1849 # now shut down one of the servers
1850 peer0 = list(smap.make_sharemap()[0])[0]
1851 self.g.remove_server(peer0)
1852 # then modify the file, leaving the old map untouched
1853 log.msg("starting winning write")
1854 return n.overwrite("contents 2")
1855 d.addCallback(_got_smap1)
1856 # now attempt to modify the file with the old servermap. This
1857 # will look just like an uncoordinated write, in which every
1858 # single share got updated between our mapupdate and our publish
1859 d.addCallback(lambda res: log.msg("starting doomed write"))
1860 d.addCallback(lambda res:
1861 self.shouldFail(UncoordinatedWriteError,
1862 "test_surprise", None,
1864 "contents 2a", self.old_map))
1866 d.addCallback(_created)
1869 def test_bad_server(self):
1870 # Break one server, then create the file: the initial publish should
1871 # complete with an alternate server. Breaking a second server should
1872 # not prevent an update from succeeding either.
1873 self.basedir = "mutable/Problems/test_bad_server"
1875 nm = self.g.clients[0].nodemaker
1877 # to make sure that one of the initial peers is broken, we have to
1878 # get creative. We create an RSA key and compute its storage-index.
1879 # Then we make a KeyGenerator that always returns that one key, and
1880 # use it to create the mutable file. This will get easier when we can
1881 # use #467 static-server-selection to disable permutation and force
1882 # the choice of server for share[0].
1884 d = nm.key_generator.generate(522)
1885 def _got_key( (pubkey, privkey) ):
1886 nm.key_generator = SameKeyGenerator(pubkey, privkey)
1887 pubkey_s = pubkey.serialize()
1888 privkey_s = privkey.serialize()
1889 u = uri.WriteableSSKFileURI(ssk_writekey_hash(privkey_s),
1890 ssk_pubkey_fingerprint_hash(pubkey_s))
1891 self._storage_index = u.storage_index
1892 d.addCallback(_got_key)
1893 def _break_peer0(res):
1894 si = self._storage_index
1895 peerlist = nm.storage_broker.get_servers_for_index(si)
1896 peerid0, connection0 = peerlist[0]
1897 peerid1, connection1 = peerlist[1]
1898 connection0.broken = True
1899 self.connection1 = connection1
1900 d.addCallback(_break_peer0)
1901 # now "create" the file, using the pre-established key, and let the
1902 # initial publish finally happen
1903 d.addCallback(lambda res: nm.create_mutable_file("contents 1"))
1904 # that ought to work
1906 d = n.download_best_version()
1907 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
1908 # now break the second peer
1909 def _break_peer1(res):
1910 self.connection1.broken = True
1911 d.addCallback(_break_peer1)
1912 d.addCallback(lambda res: n.overwrite("contents 2"))
1913 # that ought to work too
1914 d.addCallback(lambda res: n.download_best_version())
1915 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
1916 def _explain_error(f):
1918 if f.check(NotEnoughServersError):
1919 print "first_error:", f.value.first_error
1921 d.addErrback(_explain_error)
1923 d.addCallback(_got_node)
1926 def test_bad_server_overlap(self):
1927 # like test_bad_server, but with no extra unused servers to fall back
1928 # upon. This means that we must re-use a server which we've already
1929 # used. If we don't remember the fact that we sent them one share
1930 # already, we'll mistakenly think we're experiencing an
1931 # UncoordinatedWriteError.
1933 # Break one server, then create the file: the initial publish should
1934 # complete with an alternate server. Breaking a second server should
1935 # not prevent an update from succeeding either.
1936 self.basedir = "mutable/Problems/test_bad_server_overlap"
1938 nm = self.g.clients[0].nodemaker
1939 sb = nm.storage_broker
1941 peerids = [serverid for (serverid,ss) in sb.get_all_servers()]
1942 self.g.break_server(peerids[0])
1944 d = nm.create_mutable_file("contents 1")
1946 d = n.download_best_version()
1947 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
1948 # now break one of the remaining servers
1949 def _break_second_server(res):
1950 self.g.break_server(peerids[1])
1951 d.addCallback(_break_second_server)
1952 d.addCallback(lambda res: n.overwrite("contents 2"))
1953 # that ought to work too
1954 d.addCallback(lambda res: n.download_best_version())
1955 d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
1957 d.addCallback(_created)
1960 def test_publish_all_servers_bad(self):
1961 # Break all servers: the publish should fail
1962 self.basedir = "mutable/Problems/test_publish_all_servers_bad"
1964 nm = self.g.clients[0].nodemaker
1965 for (serverid,ss) in nm.storage_broker.get_all_servers():
1968 d = self.shouldFail(NotEnoughServersError,
1969 "test_publish_all_servers_bad",
1970 "Ran out of non-bad servers",
1971 nm.create_mutable_file, "contents")
1974 def test_publish_no_servers(self):
1975 # no servers at all: the publish should fail
1976 self.basedir = "mutable/Problems/test_publish_no_servers"
1977 self.set_up_grid(num_servers=0)
1978 nm = self.g.clients[0].nodemaker
1980 d = self.shouldFail(NotEnoughServersError,
1981 "test_publish_no_servers",
1982 "Ran out of non-bad servers",
1983 nm.create_mutable_file, "contents")
1985 test_publish_no_servers.timeout = 30
1988 def test_privkey_query_error(self):
1989 # when a servermap is updated with MODE_WRITE, it tries to get the
1990 # privkey. Something might go wrong during this query attempt.
1991 # Exercise the code in _privkey_query_failed which tries to handle
1993 self.basedir = "mutable/Problems/test_privkey_query_error"
1994 self.set_up_grid(num_servers=20)
1995 nm = self.g.clients[0].nodemaker
1996 nm._node_cache = DevNullDictionary() # disable the nodecache
1998 # we need some contents that are large enough to push the privkey out
1999 # of the early part of the file
2000 LARGE = "These are Larger contents" * 2000 # about 50KB
2001 d = nm.create_mutable_file(LARGE)
2003 self.uri = n.get_uri()
2004 self.n2 = nm.create_from_cap(self.uri)
2006 # When a mapupdate is performed on a node that doesn't yet know
2007 # the privkey, a short read is sent to a batch of servers, to get
2008 # the verinfo and (hopefully, if the file is short enough) the
2009 # encprivkey. Our file is too large to let this first read
2010 # contain the encprivkey. Each non-encprivkey-bearing response
2011 # that arrives (until the node gets the encprivkey) will trigger
2012 # a second read to specifically read the encprivkey.
2014 # So, to exercise this case:
2015 # 1. notice which server gets a read() call first
2016 # 2. tell that server to start throwing errors
2017 killer = FirstServerGetsKilled()
2018 for (serverid,ss) in nm.storage_broker.get_all_servers():
2019 ss.post_call_notifier = killer.notify
2020 d.addCallback(_created)
2022 # now we update a servermap from a new node (which doesn't have the
2023 # privkey yet, forcing it to use a separate privkey query). Note that
2024 # the map-update will succeed, since we'll just get a copy from one
2025 # of the other shares.
2026 d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
2030 def test_privkey_query_missing(self):
2031 # like test_privkey_query_error, but the shares are deleted by the
2032 # second query, instead of raising an exception.
2033 self.basedir = "mutable/Problems/test_privkey_query_missing"
2034 self.set_up_grid(num_servers=20)
2035 nm = self.g.clients[0].nodemaker
2036 LARGE = "These are Larger contents" * 2000 # about 50KB
2037 nm._node_cache = DevNullDictionary() # disable the nodecache
2039 d = nm.create_mutable_file(LARGE)
2041 self.uri = n.get_uri()
2042 self.n2 = nm.create_from_cap(self.uri)
2043 deleter = FirstServerGetsDeleted()
2044 for (serverid,ss) in nm.storage_broker.get_all_servers():
2045 ss.post_call_notifier = deleter.notify
2046 d.addCallback(_created)
2047 d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))