]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_mutable.py
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_mutable.py
1
2 import struct
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer, reactor
6 from allmydata import uri, client
7 from allmydata.nodemaker import NodeMaker
8 from allmydata.immutable import download
9 from allmydata.util import base32
10 from allmydata.util.idlib import shortnodeid_b2a
11 from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
12      ssk_pubkey_fingerprint_hash
13 from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \
14      NotEnoughSharesError
15 from allmydata.monitor import Monitor
16 from allmydata.test.common import ShouldFailMixin
17 from allmydata.test.no_network import GridTestMixin
18 from foolscap.api import eventually, fireEventually
19 from foolscap.logging import log
20 from allmydata.storage_client import StorageFarmBroker
21
22 from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
23 from allmydata.mutable.common import ResponseCache, \
24      MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
25      NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
26      NotEnoughServersError, CorruptShareError
27 from allmydata.mutable.retrieve import Retrieve
28 from allmydata.mutable.publish import Publish
29 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
30 from allmydata.mutable.layout import unpack_header, unpack_share
31 from allmydata.mutable.repairer import MustForceRepairError
32
33 import common_util as testutil
34
35 # this "FakeStorage" exists to put the share data in RAM and avoid using real
36 # network connections, both to speed up the tests and to reduce the amount of
37 # non-mutable.py code being exercised.
38
39 class FakeStorage:
40     # this class replaces the collection of storage servers, allowing the
41     # tests to examine and manipulate the published shares. It also lets us
42     # control the order in which read queries are answered, to exercise more
43     # of the error-handling code in Retrieve .
44     #
45     # Note that we ignore the storage index: this FakeStorage instance can
46     # only be used for a single storage index.
47
48
49     def __init__(self):
50         self._peers = {}
51         # _sequence is used to cause the responses to occur in a specific
52         # order. If it is in use, then we will defer queries instead of
53         # answering them right away, accumulating the Deferreds in a dict. We
54         # don't know exactly how many queries we'll get, so exactly one
55         # second after the first query arrives, we will release them all (in
56         # order).
57         self._sequence = None
58         self._pending = {}
59         self._pending_timer = None
60
61     def read(self, peerid, storage_index):
62         shares = self._peers.get(peerid, {})
63         if self._sequence is None:
64             return defer.succeed(shares)
65         d = defer.Deferred()
66         if not self._pending:
67             self._pending_timer = reactor.callLater(1.0, self._fire_readers)
68         self._pending[peerid] = (d, shares)
69         return d
70
71     def _fire_readers(self):
72         self._pending_timer = None
73         pending = self._pending
74         self._pending = {}
75         extra = []
76         for peerid in self._sequence:
77             if peerid in pending:
78                 d, shares = pending.pop(peerid)
79                 eventually(d.callback, shares)
80         for (d, shares) in pending.values():
81             eventually(d.callback, shares)
82
83     def write(self, peerid, storage_index, shnum, offset, data):
84         if peerid not in self._peers:
85             self._peers[peerid] = {}
86         shares = self._peers[peerid]
87         f = StringIO()
88         f.write(shares.get(shnum, ""))
89         f.seek(offset)
90         f.write(data)
91         shares[shnum] = f.getvalue()
92
93
94 class FakeStorageServer:
95     def __init__(self, peerid, storage):
96         self.peerid = peerid
97         self.storage = storage
98         self.queries = 0
99     def callRemote(self, methname, *args, **kwargs):
100         def _call():
101             meth = getattr(self, methname)
102             return meth(*args, **kwargs)
103         d = fireEventually()
104         d.addCallback(lambda res: _call())
105         return d
106     def callRemoteOnly(self, methname, *args, **kwargs):
107         d = self.callRemote(methname, *args, **kwargs)
108         d.addBoth(lambda ignore: None)
109         pass
110
111     def advise_corrupt_share(self, share_type, storage_index, shnum, reason):
112         pass
113
114     def slot_readv(self, storage_index, shnums, readv):
115         d = self.storage.read(self.peerid, storage_index)
116         def _read(shares):
117             response = {}
118             for shnum in shares:
119                 if shnums and shnum not in shnums:
120                     continue
121                 vector = response[shnum] = []
122                 for (offset, length) in readv:
123                     assert isinstance(offset, (int, long)), offset
124                     assert isinstance(length, (int, long)), length
125                     vector.append(shares[shnum][offset:offset+length])
126             return response
127         d.addCallback(_read)
128         return d
129
130     def slot_testv_and_readv_and_writev(self, storage_index, secrets,
131                                         tw_vectors, read_vector):
132         # always-pass: parrot the test vectors back to them.
133         readv = {}
134         for shnum, (testv, writev, new_length) in tw_vectors.items():
135             for (offset, length, op, specimen) in testv:
136                 assert op in ("le", "eq", "ge")
137             # TODO: this isn't right, the read is controlled by read_vector,
138             # not by testv
139             readv[shnum] = [ specimen
140                              for (offset, length, op, specimen)
141                              in testv ]
142             for (offset, data) in writev:
143                 self.storage.write(self.peerid, storage_index, shnum,
144                                    offset, data)
145         answer = (True, readv)
146         return fireEventually(answer)
147
148
149 def flip_bit(original, byte_offset):
150     return (original[:byte_offset] +
151             chr(ord(original[byte_offset]) ^ 0x01) +
152             original[byte_offset+1:])
153
154 def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
155     # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
156     # list of shnums to corrupt.
157     for peerid in s._peers:
158         shares = s._peers[peerid]
159         for shnum in shares:
160             if (shnums_to_corrupt is not None
161                 and shnum not in shnums_to_corrupt):
162                 continue
163             data = shares[shnum]
164             (version,
165              seqnum,
166              root_hash,
167              IV,
168              k, N, segsize, datalen,
169              o) = unpack_header(data)
170             if isinstance(offset, tuple):
171                 offset1, offset2 = offset
172             else:
173                 offset1 = offset
174                 offset2 = 0
175             if offset1 == "pubkey":
176                 real_offset = 107
177             elif offset1 in o:
178                 real_offset = o[offset1]
179             else:
180                 real_offset = offset1
181             real_offset = int(real_offset) + offset2 + offset_offset
182             assert isinstance(real_offset, int), offset
183             shares[shnum] = flip_bit(data, real_offset)
184     return res
185
186 def make_storagebroker(s=None, num_peers=10):
187     if not s:
188         s = FakeStorage()
189     peerids = [tagged_hash("peerid", "%d" % i)[:20]
190                for i in range(num_peers)]
191     storage_broker = StorageFarmBroker(None, True)
192     for peerid in peerids:
193         fss = FakeStorageServer(peerid, s)
194         storage_broker.test_add_server(peerid, fss)
195     return storage_broker
196
197 def make_nodemaker(s=None, num_peers=10):
198     storage_broker = make_storagebroker(s, num_peers)
199     sh = client.SecretHolder("lease secret")
200     keygen = client.KeyGenerator()
201     keygen.set_default_keysize(522)
202     nodemaker = NodeMaker(storage_broker, sh, None,
203                           None, None, None,
204                           {"k": 3, "n": 10}, keygen)
205     return nodemaker
206
207 class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
208     # this used to be in Publish, but we removed the limit. Some of
209     # these tests test whether the new code correctly allows files
210     # larger than the limit.
211     OLD_MAX_SEGMENT_SIZE = 3500000
212     def setUp(self):
213         self._storage = s = FakeStorage()
214         self.nodemaker = make_nodemaker(s)
215
216     def test_create(self):
217         d = self.nodemaker.create_mutable_file()
218         def _created(n):
219             self.failUnless(isinstance(n, MutableFileNode))
220             self.failUnlessEqual(n.get_storage_index(), n._storage_index)
221             sb = self.nodemaker.storage_broker
222             peer0 = sorted(sb.get_all_serverids())[0]
223             shnums = self._storage._peers[peer0].keys()
224             self.failUnlessEqual(len(shnums), 1)
225         d.addCallback(_created)
226         return d
227
228     def test_serialize(self):
229         n = MutableFileNode(None, None, {"k": 3, "n": 10}, None)
230         calls = []
231         def _callback(*args, **kwargs):
232             self.failUnlessEqual(args, (4,) )
233             self.failUnlessEqual(kwargs, {"foo": 5})
234             calls.append(1)
235             return 6
236         d = n._do_serialized(_callback, 4, foo=5)
237         def _check_callback(res):
238             self.failUnlessEqual(res, 6)
239             self.failUnlessEqual(calls, [1])
240         d.addCallback(_check_callback)
241
242         def _errback():
243             raise ValueError("heya")
244         d.addCallback(lambda res:
245                       self.shouldFail(ValueError, "_check_errback", "heya",
246                                       n._do_serialized, _errback))
247         return d
248
249     def test_upload_and_download(self):
250         d = self.nodemaker.create_mutable_file()
251         def _created(n):
252             d = defer.succeed(None)
253             d.addCallback(lambda res: n.get_servermap(MODE_READ))
254             d.addCallback(lambda smap: smap.dump(StringIO()))
255             d.addCallback(lambda sio:
256                           self.failUnless("3-of-10" in sio.getvalue()))
257             d.addCallback(lambda res: n.overwrite("contents 1"))
258             d.addCallback(lambda res: self.failUnlessIdentical(res, None))
259             d.addCallback(lambda res: n.download_best_version())
260             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
261             d.addCallback(lambda res: n.get_size_of_best_version())
262             d.addCallback(lambda size:
263                           self.failUnlessEqual(size, len("contents 1")))
264             d.addCallback(lambda res: n.overwrite("contents 2"))
265             d.addCallback(lambda res: n.download_best_version())
266             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
267             d.addCallback(lambda res: n.download(download.Data()))
268             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
269             d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
270             d.addCallback(lambda smap: n.upload("contents 3", smap))
271             d.addCallback(lambda res: n.download_best_version())
272             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
273             d.addCallback(lambda res: n.get_servermap(MODE_ANYTHING))
274             d.addCallback(lambda smap:
275                           n.download_version(smap,
276                                              smap.best_recoverable_version()))
277             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
278             # test a file that is large enough to overcome the
279             # mapupdate-to-retrieve data caching (i.e. make the shares larger
280             # than the default readsize, which is 2000 bytes). A 15kB file
281             # will have 5kB shares.
282             d.addCallback(lambda res: n.overwrite("large size file" * 1000))
283             d.addCallback(lambda res: n.download_best_version())
284             d.addCallback(lambda res:
285                           self.failUnlessEqual(res, "large size file" * 1000))
286             return d
287         d.addCallback(_created)
288         return d
289
290     def test_create_with_initial_contents(self):
291         d = self.nodemaker.create_mutable_file("contents 1")
292         def _created(n):
293             d = n.download_best_version()
294             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
295             d.addCallback(lambda res: n.overwrite("contents 2"))
296             d.addCallback(lambda res: n.download_best_version())
297             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
298             return d
299         d.addCallback(_created)
300         return d
301
302     def test_create_with_too_large_contents(self):
303         BIG = "a" * (self.OLD_MAX_SEGMENT_SIZE + 1)
304         d = self.nodemaker.create_mutable_file(BIG)
305         def _created(n):
306             d = n.overwrite(BIG)
307             return d
308         d.addCallback(_created)
309         return d
310
311     def failUnlessCurrentSeqnumIs(self, n, expected_seqnum, which):
312         d = n.get_servermap(MODE_READ)
313         d.addCallback(lambda servermap: servermap.best_recoverable_version())
314         d.addCallback(lambda verinfo:
315                       self.failUnlessEqual(verinfo[0], expected_seqnum, which))
316         return d
317
318     def test_modify(self):
319         def _modifier(old_contents, servermap, first_time):
320             return old_contents + "line2"
321         def _non_modifier(old_contents, servermap, first_time):
322             return old_contents
323         def _none_modifier(old_contents, servermap, first_time):
324             return None
325         def _error_modifier(old_contents, servermap, first_time):
326             raise ValueError("oops")
327         def _toobig_modifier(old_contents, servermap, first_time):
328             return "b" * (self.OLD_MAX_SEGMENT_SIZE+1)
329         calls = []
330         def _ucw_error_modifier(old_contents, servermap, first_time):
331             # simulate an UncoordinatedWriteError once
332             calls.append(1)
333             if len(calls) <= 1:
334                 raise UncoordinatedWriteError("simulated")
335             return old_contents + "line3"
336         def _ucw_error_non_modifier(old_contents, servermap, first_time):
337             # simulate an UncoordinatedWriteError once, and don't actually
338             # modify the contents on subsequent invocations
339             calls.append(1)
340             if len(calls) <= 1:
341                 raise UncoordinatedWriteError("simulated")
342             return old_contents
343
344         d = self.nodemaker.create_mutable_file("line1")
345         def _created(n):
346             d = n.modify(_modifier)
347             d.addCallback(lambda res: n.download_best_version())
348             d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
349             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "m"))
350
351             d.addCallback(lambda res: n.modify(_non_modifier))
352             d.addCallback(lambda res: n.download_best_version())
353             d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
354             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "non"))
355
356             d.addCallback(lambda res: n.modify(_none_modifier))
357             d.addCallback(lambda res: n.download_best_version())
358             d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
359             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "none"))
360
361             d.addCallback(lambda res:
362                           self.shouldFail(ValueError, "error_modifier", None,
363                                           n.modify, _error_modifier))
364             d.addCallback(lambda res: n.download_best_version())
365             d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
366             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "err"))
367
368
369             d.addCallback(lambda res: n.download_best_version())
370             d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
371             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "big"))
372
373             d.addCallback(lambda res: n.modify(_ucw_error_modifier))
374             d.addCallback(lambda res: self.failUnlessEqual(len(calls), 2))
375             d.addCallback(lambda res: n.download_best_version())
376             d.addCallback(lambda res: self.failUnlessEqual(res,
377                                                            "line1line2line3"))
378             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 3, "ucw"))
379
380             def _reset_ucw_error_modifier(res):
381                 calls[:] = []
382                 return res
383             d.addCallback(_reset_ucw_error_modifier)
384
385             # in practice, this n.modify call should publish twice: the first
386             # one gets a UCWE, the second does not. But our test jig (in
387             # which the modifier raises the UCWE) skips over the first one,
388             # so in this test there will be only one publish, and the seqnum
389             # will only be one larger than the previous test, not two (i.e. 4
390             # instead of 5).
391             d.addCallback(lambda res: n.modify(_ucw_error_non_modifier))
392             d.addCallback(lambda res: self.failUnlessEqual(len(calls), 2))
393             d.addCallback(lambda res: n.download_best_version())
394             d.addCallback(lambda res: self.failUnlessEqual(res,
395                                                            "line1line2line3"))
396             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 4, "ucw"))
397             d.addCallback(lambda res: n.modify(_toobig_modifier))
398             return d
399         d.addCallback(_created)
400         return d
401
402     def test_modify_backoffer(self):
403         def _modifier(old_contents, servermap, first_time):
404             return old_contents + "line2"
405         calls = []
406         def _ucw_error_modifier(old_contents, servermap, first_time):
407             # simulate an UncoordinatedWriteError once
408             calls.append(1)
409             if len(calls) <= 1:
410                 raise UncoordinatedWriteError("simulated")
411             return old_contents + "line3"
412         def _always_ucw_error_modifier(old_contents, servermap, first_time):
413             raise UncoordinatedWriteError("simulated")
414         def _backoff_stopper(node, f):
415             return f
416         def _backoff_pauser(node, f):
417             d = defer.Deferred()
418             reactor.callLater(0.5, d.callback, None)
419             return d
420
421         # the give-up-er will hit its maximum retry count quickly
422         giveuper = BackoffAgent()
423         giveuper._delay = 0.1
424         giveuper.factor = 1
425
426         d = self.nodemaker.create_mutable_file("line1")
427         def _created(n):
428             d = n.modify(_modifier)
429             d.addCallback(lambda res: n.download_best_version())
430             d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
431             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "m"))
432
433             d.addCallback(lambda res:
434                           self.shouldFail(UncoordinatedWriteError,
435                                           "_backoff_stopper", None,
436                                           n.modify, _ucw_error_modifier,
437                                           _backoff_stopper))
438             d.addCallback(lambda res: n.download_best_version())
439             d.addCallback(lambda res: self.failUnlessEqual(res, "line1line2"))
440             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 2, "stop"))
441
442             def _reset_ucw_error_modifier(res):
443                 calls[:] = []
444                 return res
445             d.addCallback(_reset_ucw_error_modifier)
446             d.addCallback(lambda res: n.modify(_ucw_error_modifier,
447                                                _backoff_pauser))
448             d.addCallback(lambda res: n.download_best_version())
449             d.addCallback(lambda res: self.failUnlessEqual(res,
450                                                            "line1line2line3"))
451             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 3, "pause"))
452
453             d.addCallback(lambda res:
454                           self.shouldFail(UncoordinatedWriteError,
455                                           "giveuper", None,
456                                           n.modify, _always_ucw_error_modifier,
457                                           giveuper.delay))
458             d.addCallback(lambda res: n.download_best_version())
459             d.addCallback(lambda res: self.failUnlessEqual(res,
460                                                            "line1line2line3"))
461             d.addCallback(lambda res: self.failUnlessCurrentSeqnumIs(n, 3, "giveup"))
462
463             return d
464         d.addCallback(_created)
465         return d
466
467     def test_upload_and_download_full_size_keys(self):
468         self.nodemaker.key_generator = client.KeyGenerator()
469         d = self.nodemaker.create_mutable_file()
470         def _created(n):
471             d = defer.succeed(None)
472             d.addCallback(lambda res: n.get_servermap(MODE_READ))
473             d.addCallback(lambda smap: smap.dump(StringIO()))
474             d.addCallback(lambda sio:
475                           self.failUnless("3-of-10" in sio.getvalue()))
476             d.addCallback(lambda res: n.overwrite("contents 1"))
477             d.addCallback(lambda res: self.failUnlessIdentical(res, None))
478             d.addCallback(lambda res: n.download_best_version())
479             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
480             d.addCallback(lambda res: n.overwrite("contents 2"))
481             d.addCallback(lambda res: n.download_best_version())
482             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
483             d.addCallback(lambda res: n.download(download.Data()))
484             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
485             d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
486             d.addCallback(lambda smap: n.upload("contents 3", smap))
487             d.addCallback(lambda res: n.download_best_version())
488             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
489             d.addCallback(lambda res: n.get_servermap(MODE_ANYTHING))
490             d.addCallback(lambda smap:
491                           n.download_version(smap,
492                                              smap.best_recoverable_version()))
493             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
494             return d
495         d.addCallback(_created)
496         return d
497
498
499 class MakeShares(unittest.TestCase):
500     def test_encrypt(self):
501         nm = make_nodemaker()
502         CONTENTS = "some initial contents"
503         d = nm.create_mutable_file(CONTENTS)
504         def _created(fn):
505             p = Publish(fn, nm.storage_broker, None)
506             p.salt = "SALT" * 4
507             p.readkey = "\x00" * 16
508             p.newdata = CONTENTS
509             p.required_shares = 3
510             p.total_shares = 10
511             p.setup_encoding_parameters()
512             return p._encrypt_and_encode()
513         d.addCallback(_created)
514         def _done(shares_and_shareids):
515             (shares, share_ids) = shares_and_shareids
516             self.failUnlessEqual(len(shares), 10)
517             for sh in shares:
518                 self.failUnless(isinstance(sh, str))
519                 self.failUnlessEqual(len(sh), 7)
520             self.failUnlessEqual(len(share_ids), 10)
521         d.addCallback(_done)
522         return d
523
524     def test_generate(self):
525         nm = make_nodemaker()
526         CONTENTS = "some initial contents"
527         d = nm.create_mutable_file(CONTENTS)
528         def _created(fn):
529             self._fn = fn
530             p = Publish(fn, nm.storage_broker, None)
531             self._p = p
532             p.newdata = CONTENTS
533             p.required_shares = 3
534             p.total_shares = 10
535             p.setup_encoding_parameters()
536             p._new_seqnum = 3
537             p.salt = "SALT" * 4
538             # make some fake shares
539             shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
540             p._privkey = fn.get_privkey()
541             p._encprivkey = fn.get_encprivkey()
542             p._pubkey = fn.get_pubkey()
543             return p._generate_shares(shares_and_ids)
544         d.addCallback(_created)
545         def _generated(res):
546             p = self._p
547             final_shares = p.shares
548             root_hash = p.root_hash
549             self.failUnlessEqual(len(root_hash), 32)
550             self.failUnless(isinstance(final_shares, dict))
551             self.failUnlessEqual(len(final_shares), 10)
552             self.failUnlessEqual(sorted(final_shares.keys()), range(10))
553             for i,sh in final_shares.items():
554                 self.failUnless(isinstance(sh, str))
555                 # feed the share through the unpacker as a sanity-check
556                 pieces = unpack_share(sh)
557                 (u_seqnum, u_root_hash, IV, k, N, segsize, datalen,
558                  pubkey, signature, share_hash_chain, block_hash_tree,
559                  share_data, enc_privkey) = pieces
560                 self.failUnlessEqual(u_seqnum, 3)
561                 self.failUnlessEqual(u_root_hash, root_hash)
562                 self.failUnlessEqual(k, 3)
563                 self.failUnlessEqual(N, 10)
564                 self.failUnlessEqual(segsize, 21)
565                 self.failUnlessEqual(datalen, len(CONTENTS))
566                 self.failUnlessEqual(pubkey, p._pubkey.serialize())
567                 sig_material = struct.pack(">BQ32s16s BBQQ",
568                                            0, p._new_seqnum, root_hash, IV,
569                                            k, N, segsize, datalen)
570                 self.failUnless(p._pubkey.verify(sig_material, signature))
571                 #self.failUnlessEqual(signature, p._privkey.sign(sig_material))
572                 self.failUnless(isinstance(share_hash_chain, dict))
573                 self.failUnlessEqual(len(share_hash_chain), 4) # ln2(10)++
574                 for shnum,share_hash in share_hash_chain.items():
575                     self.failUnless(isinstance(shnum, int))
576                     self.failUnless(isinstance(share_hash, str))
577                     self.failUnlessEqual(len(share_hash), 32)
578                 self.failUnless(isinstance(block_hash_tree, list))
579                 self.failUnlessEqual(len(block_hash_tree), 1) # very small tree
580                 self.failUnlessEqual(IV, "SALT"*4)
581                 self.failUnlessEqual(len(share_data), len("%07d" % 1))
582                 self.failUnlessEqual(enc_privkey, self._fn.get_encprivkey())
583         d.addCallback(_generated)
584         return d
585
586     # TODO: when we publish to 20 peers, we should get one share per peer on 10
587     # when we publish to 3 peers, we should get either 3 or 4 shares per peer
588     # when we publish to zero peers, we should get a NotEnoughSharesError
589
590 class PublishMixin:
591     def publish_one(self):
592         # publish a file and create shares, which can then be manipulated
593         # later.
594         self.CONTENTS = "New contents go here" * 1000
595         num_peers = 20
596         self._storage = FakeStorage()
597         self._nodemaker = make_nodemaker(self._storage)
598         self._storage_broker = self._nodemaker.storage_broker
599         d = self._nodemaker.create_mutable_file(self.CONTENTS)
600         def _created(node):
601             self._fn = node
602             self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
603         d.addCallback(_created)
604         return d
605
606     def publish_multiple(self):
607         self.CONTENTS = ["Contents 0",
608                          "Contents 1",
609                          "Contents 2",
610                          "Contents 3a",
611                          "Contents 3b"]
612         self._copied_shares = {}
613         num_peers = 20
614         self._storage = FakeStorage()
615         self._nodemaker = make_nodemaker(self._storage)
616         d = self._nodemaker.create_mutable_file(self.CONTENTS[0]) # seqnum=1
617         def _created(node):
618             self._fn = node
619             # now create multiple versions of the same file, and accumulate
620             # their shares, so we can mix and match them later.
621             d = defer.succeed(None)
622             d.addCallback(self._copy_shares, 0)
623             d.addCallback(lambda res: node.overwrite(self.CONTENTS[1])) #s2
624             d.addCallback(self._copy_shares, 1)
625             d.addCallback(lambda res: node.overwrite(self.CONTENTS[2])) #s3
626             d.addCallback(self._copy_shares, 2)
627             d.addCallback(lambda res: node.overwrite(self.CONTENTS[3])) #s4a
628             d.addCallback(self._copy_shares, 3)
629             # now we replace all the shares with version s3, and upload a new
630             # version to get s4b.
631             rollback = dict([(i,2) for i in range(10)])
632             d.addCallback(lambda res: self._set_versions(rollback))
633             d.addCallback(lambda res: node.overwrite(self.CONTENTS[4])) #s4b
634             d.addCallback(self._copy_shares, 4)
635             # we leave the storage in state 4
636             return d
637         d.addCallback(_created)
638         return d
639
640     def _copy_shares(self, ignored, index):
641         shares = self._storage._peers
642         # we need a deep copy
643         new_shares = {}
644         for peerid in shares:
645             new_shares[peerid] = {}
646             for shnum in shares[peerid]:
647                 new_shares[peerid][shnum] = shares[peerid][shnum]
648         self._copied_shares[index] = new_shares
649
650     def _set_versions(self, versionmap):
651         # versionmap maps shnums to which version (0,1,2,3,4) we want the
652         # share to be at. Any shnum which is left out of the map will stay at
653         # its current version.
654         shares = self._storage._peers
655         oldshares = self._copied_shares
656         for peerid in shares:
657             for shnum in shares[peerid]:
658                 if shnum in versionmap:
659                     index = versionmap[shnum]
660                     shares[peerid][shnum] = oldshares[index][peerid][shnum]
661
662
663 class Servermap(unittest.TestCase, PublishMixin):
664     def setUp(self):
665         return self.publish_one()
666
667     def make_servermap(self, mode=MODE_CHECK, fn=None, sb=None):
668         if fn is None:
669             fn = self._fn
670         if sb is None:
671             sb = self._storage_broker
672         smu = ServermapUpdater(fn, sb, Monitor(),
673                                ServerMap(), mode)
674         d = smu.update()
675         return d
676
677     def update_servermap(self, oldmap, mode=MODE_CHECK):
678         smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(),
679                                oldmap, mode)
680         d = smu.update()
681         return d
682
683     def failUnlessOneRecoverable(self, sm, num_shares):
684         self.failUnlessEqual(len(sm.recoverable_versions()), 1)
685         self.failUnlessEqual(len(sm.unrecoverable_versions()), 0)
686         best = sm.best_recoverable_version()
687         self.failIfEqual(best, None)
688         self.failUnlessEqual(sm.recoverable_versions(), set([best]))
689         self.failUnlessEqual(len(sm.shares_available()), 1)
690         self.failUnlessEqual(sm.shares_available()[best], (num_shares, 3, 10))
691         shnum, peerids = sm.make_sharemap().items()[0]
692         peerid = list(peerids)[0]
693         self.failUnlessEqual(sm.version_on_peer(peerid, shnum), best)
694         self.failUnlessEqual(sm.version_on_peer(peerid, 666), None)
695         return sm
696
697     def test_basic(self):
698         d = defer.succeed(None)
699         ms = self.make_servermap
700         us = self.update_servermap
701
702         d.addCallback(lambda res: ms(mode=MODE_CHECK))
703         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
704         d.addCallback(lambda res: ms(mode=MODE_WRITE))
705         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
706         d.addCallback(lambda res: ms(mode=MODE_READ))
707         # this more stops at k+epsilon, and epsilon=k, so 6 shares
708         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
709         d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
710         # this mode stops at 'k' shares
711         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
712
713         # and can we re-use the same servermap? Note that these are sorted in
714         # increasing order of number of servers queried, since once a server
715         # gets into the servermap, we'll always ask it for an update.
716         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
717         d.addCallback(lambda sm: us(sm, mode=MODE_READ))
718         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
719         d.addCallback(lambda sm: us(sm, mode=MODE_WRITE))
720         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
721         d.addCallback(lambda sm: us(sm, mode=MODE_CHECK))
722         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
723         d.addCallback(lambda sm: us(sm, mode=MODE_ANYTHING))
724         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
725
726         return d
727
728     def test_fetch_privkey(self):
729         d = defer.succeed(None)
730         # use the sibling filenode (which hasn't been used yet), and make
731         # sure it can fetch the privkey. The file is small, so the privkey
732         # will be fetched on the first (query) pass.
733         d.addCallback(lambda res: self.make_servermap(MODE_WRITE, self._fn2))
734         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
735
736         # create a new file, which is large enough to knock the privkey out
737         # of the early part of the file
738         LARGE = "These are Larger contents" * 200 # about 5KB
739         d.addCallback(lambda res: self._nodemaker.create_mutable_file(LARGE))
740         def _created(large_fn):
741             large_fn2 = self._nodemaker.create_from_cap(large_fn.get_uri())
742             return self.make_servermap(MODE_WRITE, large_fn2)
743         d.addCallback(_created)
744         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
745         return d
746
747     def test_mark_bad(self):
748         d = defer.succeed(None)
749         ms = self.make_servermap
750         us = self.update_servermap
751
752         d.addCallback(lambda res: ms(mode=MODE_READ))
753         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
754         def _made_map(sm):
755             v = sm.best_recoverable_version()
756             vm = sm.make_versionmap()
757             shares = list(vm[v])
758             self.failUnlessEqual(len(shares), 6)
759             self._corrupted = set()
760             # mark the first 5 shares as corrupt, then update the servermap.
761             # The map should not have the marked shares it in any more, and
762             # new shares should be found to replace the missing ones.
763             for (shnum, peerid, timestamp) in shares:
764                 if shnum < 5:
765                     self._corrupted.add( (peerid, shnum) )
766                     sm.mark_bad_share(peerid, shnum, "")
767             return self.update_servermap(sm, MODE_WRITE)
768         d.addCallback(_made_map)
769         def _check_map(sm):
770             # this should find all 5 shares that weren't marked bad
771             v = sm.best_recoverable_version()
772             vm = sm.make_versionmap()
773             shares = list(vm[v])
774             for (peerid, shnum) in self._corrupted:
775                 peer_shares = sm.shares_on_peer(peerid)
776                 self.failIf(shnum in peer_shares,
777                             "%d was in %s" % (shnum, peer_shares))
778             self.failUnlessEqual(len(shares), 5)
779         d.addCallback(_check_map)
780         return d
781
782     def failUnlessNoneRecoverable(self, sm):
783         self.failUnlessEqual(len(sm.recoverable_versions()), 0)
784         self.failUnlessEqual(len(sm.unrecoverable_versions()), 0)
785         best = sm.best_recoverable_version()
786         self.failUnlessEqual(best, None)
787         self.failUnlessEqual(len(sm.shares_available()), 0)
788
789     def test_no_shares(self):
790         self._storage._peers = {} # delete all shares
791         ms = self.make_servermap
792         d = defer.succeed(None)
793
794         d.addCallback(lambda res: ms(mode=MODE_CHECK))
795         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
796
797         d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
798         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
799
800         d.addCallback(lambda res: ms(mode=MODE_WRITE))
801         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
802
803         d.addCallback(lambda res: ms(mode=MODE_READ))
804         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
805
806         return d
807
808     def failUnlessNotQuiteEnough(self, sm):
809         self.failUnlessEqual(len(sm.recoverable_versions()), 0)
810         self.failUnlessEqual(len(sm.unrecoverable_versions()), 1)
811         best = sm.best_recoverable_version()
812         self.failUnlessEqual(best, None)
813         self.failUnlessEqual(len(sm.shares_available()), 1)
814         self.failUnlessEqual(sm.shares_available().values()[0], (2,3,10) )
815         return sm
816
817     def test_not_quite_enough_shares(self):
818         s = self._storage
819         ms = self.make_servermap
820         num_shares = len(s._peers)
821         for peerid in s._peers:
822             s._peers[peerid] = {}
823             num_shares -= 1
824             if num_shares == 2:
825                 break
826         # now there ought to be only two shares left
827         assert len([peerid for peerid in s._peers if s._peers[peerid]]) == 2
828
829         d = defer.succeed(None)
830
831         d.addCallback(lambda res: ms(mode=MODE_CHECK))
832         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
833         d.addCallback(lambda sm:
834                       self.failUnlessEqual(len(sm.make_sharemap()), 2))
835         d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
836         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
837         d.addCallback(lambda res: ms(mode=MODE_WRITE))
838         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
839         d.addCallback(lambda res: ms(mode=MODE_READ))
840         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
841
842         return d
843
844
845
846 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
847     def setUp(self):
848         return self.publish_one()
849
850     def make_servermap(self, mode=MODE_READ, oldmap=None, sb=None):
851         if oldmap is None:
852             oldmap = ServerMap()
853         if sb is None:
854             sb = self._storage_broker
855         smu = ServermapUpdater(self._fn, sb, Monitor(), oldmap, mode)
856         d = smu.update()
857         return d
858
859     def abbrev_verinfo(self, verinfo):
860         if verinfo is None:
861             return None
862         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
863          offsets_tuple) = verinfo
864         return "%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
865
866     def abbrev_verinfo_dict(self, verinfo_d):
867         output = {}
868         for verinfo,value in verinfo_d.items():
869             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
870              offsets_tuple) = verinfo
871             output["%d-%s" % (seqnum, base32.b2a(root_hash)[:4])] = value
872         return output
873
874     def dump_servermap(self, servermap):
875         print "SERVERMAP", servermap
876         print "RECOVERABLE", [self.abbrev_verinfo(v)
877                               for v in servermap.recoverable_versions()]
878         print "BEST", self.abbrev_verinfo(servermap.best_recoverable_version())
879         print "available", self.abbrev_verinfo_dict(servermap.shares_available())
880
881     def do_download(self, servermap, version=None):
882         if version is None:
883             version = servermap.best_recoverable_version()
884         r = Retrieve(self._fn, servermap, version)
885         return r.download()
886
887     def test_basic(self):
888         d = self.make_servermap()
889         def _do_retrieve(servermap):
890             self._smap = servermap
891             #self.dump_servermap(servermap)
892             self.failUnlessEqual(len(servermap.recoverable_versions()), 1)
893             return self.do_download(servermap)
894         d.addCallback(_do_retrieve)
895         def _retrieved(new_contents):
896             self.failUnlessEqual(new_contents, self.CONTENTS)
897         d.addCallback(_retrieved)
898         # we should be able to re-use the same servermap, both with and
899         # without updating it.
900         d.addCallback(lambda res: self.do_download(self._smap))
901         d.addCallback(_retrieved)
902         d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
903         d.addCallback(lambda res: self.do_download(self._smap))
904         d.addCallback(_retrieved)
905         # clobbering the pubkey should make the servermap updater re-fetch it
906         def _clobber_pubkey(res):
907             self._fn._pubkey = None
908         d.addCallback(_clobber_pubkey)
909         d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
910         d.addCallback(lambda res: self.do_download(self._smap))
911         d.addCallback(_retrieved)
912         return d
913
914     def test_all_shares_vanished(self):
915         d = self.make_servermap()
916         def _remove_shares(servermap):
917             for shares in self._storage._peers.values():
918                 shares.clear()
919             d1 = self.shouldFail(NotEnoughSharesError,
920                                  "test_all_shares_vanished",
921                                  "ran out of peers",
922                                  self.do_download, servermap)
923             return d1
924         d.addCallback(_remove_shares)
925         return d
926
927     def test_no_servers(self):
928         sb2 = make_storagebroker(num_peers=0)
929         # if there are no servers, then a MODE_READ servermap should come
930         # back empty
931         d = self.make_servermap(sb=sb2)
932         def _check_servermap(servermap):
933             self.failUnlessEqual(servermap.best_recoverable_version(), None)
934             self.failIf(servermap.recoverable_versions())
935             self.failIf(servermap.unrecoverable_versions())
936             self.failIf(servermap.all_peers())
937         d.addCallback(_check_servermap)
938         return d
939     test_no_servers.timeout = 15
940
941     def test_no_servers_download(self):
942         sb2 = make_storagebroker(num_peers=0)
943         self._fn._storage_broker = sb2
944         d = self.shouldFail(UnrecoverableFileError,
945                             "test_no_servers_download",
946                             "no recoverable versions",
947                             self._fn.download_best_version)
948         def _restore(res):
949             # a failed download that occurs while we aren't connected to
950             # anybody should not prevent a subsequent download from working.
951             # This isn't quite the webapi-driven test that #463 wants, but it
952             # should be close enough.
953             self._fn._storage_broker = self._storage_broker
954             return self._fn.download_best_version()
955         def _retrieved(new_contents):
956             self.failUnlessEqual(new_contents, self.CONTENTS)
957         d.addCallback(_restore)
958         d.addCallback(_retrieved)
959         return d
960     test_no_servers_download.timeout = 15
961
962     def _test_corrupt_all(self, offset, substring,
963                           should_succeed=False, corrupt_early=True,
964                           failure_checker=None):
965         d = defer.succeed(None)
966         if corrupt_early:
967             d.addCallback(corrupt, self._storage, offset)
968         d.addCallback(lambda res: self.make_servermap())
969         if not corrupt_early:
970             d.addCallback(corrupt, self._storage, offset)
971         def _do_retrieve(servermap):
972             ver = servermap.best_recoverable_version()
973             if ver is None and not should_succeed:
974                 # no recoverable versions == not succeeding. The problem
975                 # should be noted in the servermap's list of problems.
976                 if substring:
977                     allproblems = [str(f) for f in servermap.problems]
978                     self.failUnlessIn(substring, "".join(allproblems))
979                 return servermap
980             if should_succeed:
981                 d1 = self._fn.download_version(servermap, ver)
982                 d1.addCallback(lambda new_contents:
983                                self.failUnlessEqual(new_contents, self.CONTENTS))
984             else:
985                 d1 = self.shouldFail(NotEnoughSharesError,
986                                      "_corrupt_all(offset=%s)" % (offset,),
987                                      substring,
988                                      self._fn.download_version, servermap, ver)
989             if failure_checker:
990                 d1.addCallback(failure_checker)
991             d1.addCallback(lambda res: servermap)
992             return d1
993         d.addCallback(_do_retrieve)
994         return d
995
996     def test_corrupt_all_verbyte(self):
997         # when the version byte is not 0, we hit an UnknownVersionError error
998         # in unpack_share().
999         d = self._test_corrupt_all(0, "UnknownVersionError")
1000         def _check_servermap(servermap):
1001             # and the dump should mention the problems
1002             s = StringIO()
1003             dump = servermap.dump(s).getvalue()
1004             self.failUnless("10 PROBLEMS" in dump, dump)
1005         d.addCallback(_check_servermap)
1006         return d
1007
1008     def test_corrupt_all_seqnum(self):
1009         # a corrupt sequence number will trigger a bad signature
1010         return self._test_corrupt_all(1, "signature is invalid")
1011
1012     def test_corrupt_all_R(self):
1013         # a corrupt root hash will trigger a bad signature
1014         return self._test_corrupt_all(9, "signature is invalid")
1015
1016     def test_corrupt_all_IV(self):
1017         # a corrupt salt/IV will trigger a bad signature
1018         return self._test_corrupt_all(41, "signature is invalid")
1019
1020     def test_corrupt_all_k(self):
1021         # a corrupt 'k' will trigger a bad signature
1022         return self._test_corrupt_all(57, "signature is invalid")
1023
1024     def test_corrupt_all_N(self):
1025         # a corrupt 'N' will trigger a bad signature
1026         return self._test_corrupt_all(58, "signature is invalid")
1027
1028     def test_corrupt_all_segsize(self):
1029         # a corrupt segsize will trigger a bad signature
1030         return self._test_corrupt_all(59, "signature is invalid")
1031
1032     def test_corrupt_all_datalen(self):
1033         # a corrupt data length will trigger a bad signature
1034         return self._test_corrupt_all(67, "signature is invalid")
1035
1036     def test_corrupt_all_pubkey(self):
1037         # a corrupt pubkey won't match the URI's fingerprint. We need to
1038         # remove the pubkey from the filenode, or else it won't bother trying
1039         # to update it.
1040         self._fn._pubkey = None
1041         return self._test_corrupt_all("pubkey",
1042                                       "pubkey doesn't match fingerprint")
1043
1044     def test_corrupt_all_sig(self):
1045         # a corrupt signature is a bad one
1046         # the signature runs from about [543:799], depending upon the length
1047         # of the pubkey
1048         return self._test_corrupt_all("signature", "signature is invalid")
1049
1050     def test_corrupt_all_share_hash_chain_number(self):
1051         # a corrupt share hash chain entry will show up as a bad hash. If we
1052         # mangle the first byte, that will look like a bad hash number,
1053         # causing an IndexError
1054         return self._test_corrupt_all("share_hash_chain", "corrupt hashes")
1055
1056     def test_corrupt_all_share_hash_chain_hash(self):
1057         # a corrupt share hash chain entry will show up as a bad hash. If we
1058         # mangle a few bytes in, that will look like a bad hash.
1059         return self._test_corrupt_all(("share_hash_chain",4), "corrupt hashes")
1060
1061     def test_corrupt_all_block_hash_tree(self):
1062         return self._test_corrupt_all("block_hash_tree",
1063                                       "block hash tree failure")
1064
1065     def test_corrupt_all_block(self):
1066         return self._test_corrupt_all("share_data", "block hash tree failure")
1067
1068     def test_corrupt_all_encprivkey(self):
1069         # a corrupted privkey won't even be noticed by the reader, only by a
1070         # writer.
1071         return self._test_corrupt_all("enc_privkey", None, should_succeed=True)
1072
1073
1074     def test_corrupt_all_seqnum_late(self):
1075         # corrupting the seqnum between mapupdate and retrieve should result
1076         # in NotEnoughSharesError, since each share will look invalid
1077         def _check(res):
1078             f = res[0]
1079             self.failUnless(f.check(NotEnoughSharesError))
1080             self.failUnless("someone wrote to the data since we read the servermap" in str(f))
1081         return self._test_corrupt_all(1, "ran out of peers",
1082                                       corrupt_early=False,
1083                                       failure_checker=_check)
1084
1085     def test_corrupt_all_block_hash_tree_late(self):
1086         def _check(res):
1087             f = res[0]
1088             self.failUnless(f.check(NotEnoughSharesError))
1089         return self._test_corrupt_all("block_hash_tree",
1090                                       "block hash tree failure",
1091                                       corrupt_early=False,
1092                                       failure_checker=_check)
1093
1094
1095     def test_corrupt_all_block_late(self):
1096         def _check(res):
1097             f = res[0]
1098             self.failUnless(f.check(NotEnoughSharesError))
1099         return self._test_corrupt_all("share_data", "block hash tree failure",
1100                                       corrupt_early=False,
1101                                       failure_checker=_check)
1102
1103
1104     def test_basic_pubkey_at_end(self):
1105         # we corrupt the pubkey in all but the last 'k' shares, allowing the
1106         # download to succeed but forcing a bunch of retries first. Note that
1107         # this is rather pessimistic: our Retrieve process will throw away
1108         # the whole share if the pubkey is bad, even though the rest of the
1109         # share might be good.
1110
1111         self._fn._pubkey = None
1112         k = self._fn.get_required_shares()
1113         N = self._fn.get_total_shares()
1114         d = defer.succeed(None)
1115         d.addCallback(corrupt, self._storage, "pubkey",
1116                       shnums_to_corrupt=range(0, N-k))
1117         d.addCallback(lambda res: self.make_servermap())
1118         def _do_retrieve(servermap):
1119             self.failUnless(servermap.problems)
1120             self.failUnless("pubkey doesn't match fingerprint"
1121                             in str(servermap.problems[0]))
1122             ver = servermap.best_recoverable_version()
1123             r = Retrieve(self._fn, servermap, ver)
1124             return r.download()
1125         d.addCallback(_do_retrieve)
1126         d.addCallback(lambda new_contents:
1127                       self.failUnlessEqual(new_contents, self.CONTENTS))
1128         return d
1129
1130     def test_corrupt_some(self):
1131         # corrupt the data of first five shares (so the servermap thinks
1132         # they're good but retrieve marks them as bad), so that the
1133         # MODE_READ set of 6 will be insufficient, forcing node.download to
1134         # retry with more servers.
1135         corrupt(None, self._storage, "share_data", range(5))
1136         d = self.make_servermap()
1137         def _do_retrieve(servermap):
1138             ver = servermap.best_recoverable_version()
1139             self.failUnless(ver)
1140             return self._fn.download_best_version()
1141         d.addCallback(_do_retrieve)
1142         d.addCallback(lambda new_contents:
1143                       self.failUnlessEqual(new_contents, self.CONTENTS))
1144         return d
1145
1146     def test_download_fails(self):
1147         corrupt(None, self._storage, "signature")
1148         d = self.shouldFail(UnrecoverableFileError, "test_download_anyway",
1149                             "no recoverable versions",
1150                             self._fn.download_best_version)
1151         return d
1152
1153
1154 class CheckerMixin:
1155     def check_good(self, r, where):
1156         self.failUnless(r.is_healthy(), where)
1157         return r
1158
1159     def check_bad(self, r, where):
1160         self.failIf(r.is_healthy(), where)
1161         return r
1162
1163     def check_expected_failure(self, r, expected_exception, substring, where):
1164         for (peerid, storage_index, shnum, f) in r.problems:
1165             if f.check(expected_exception):
1166                 self.failUnless(substring in str(f),
1167                                 "%s: substring '%s' not in '%s'" %
1168                                 (where, substring, str(f)))
1169                 return
1170         self.fail("%s: didn't see expected exception %s in problems %s" %
1171                   (where, expected_exception, r.problems))
1172
1173
1174 class Checker(unittest.TestCase, CheckerMixin, PublishMixin):
1175     def setUp(self):
1176         return self.publish_one()
1177
1178
1179     def test_check_good(self):
1180         d = self._fn.check(Monitor())
1181         d.addCallback(self.check_good, "test_check_good")
1182         return d
1183
1184     def test_check_no_shares(self):
1185         for shares in self._storage._peers.values():
1186             shares.clear()
1187         d = self._fn.check(Monitor())
1188         d.addCallback(self.check_bad, "test_check_no_shares")
1189         return d
1190
1191     def test_check_not_enough_shares(self):
1192         for shares in self._storage._peers.values():
1193             for shnum in shares.keys():
1194                 if shnum > 0:
1195                     del shares[shnum]
1196         d = self._fn.check(Monitor())
1197         d.addCallback(self.check_bad, "test_check_not_enough_shares")
1198         return d
1199
1200     def test_check_all_bad_sig(self):
1201         corrupt(None, self._storage, 1) # bad sig
1202         d = self._fn.check(Monitor())
1203         d.addCallback(self.check_bad, "test_check_all_bad_sig")
1204         return d
1205
1206     def test_check_all_bad_blocks(self):
1207         corrupt(None, self._storage, "share_data", [9]) # bad blocks
1208         # the Checker won't notice this.. it doesn't look at actual data
1209         d = self._fn.check(Monitor())
1210         d.addCallback(self.check_good, "test_check_all_bad_blocks")
1211         return d
1212
1213     def test_verify_good(self):
1214         d = self._fn.check(Monitor(), verify=True)
1215         d.addCallback(self.check_good, "test_verify_good")
1216         return d
1217
1218     def test_verify_all_bad_sig(self):
1219         corrupt(None, self._storage, 1) # bad sig
1220         d = self._fn.check(Monitor(), verify=True)
1221         d.addCallback(self.check_bad, "test_verify_all_bad_sig")
1222         return d
1223
1224     def test_verify_one_bad_sig(self):
1225         corrupt(None, self._storage, 1, [9]) # bad sig
1226         d = self._fn.check(Monitor(), verify=True)
1227         d.addCallback(self.check_bad, "test_verify_one_bad_sig")
1228         return d
1229
1230     def test_verify_one_bad_block(self):
1231         corrupt(None, self._storage, "share_data", [9]) # bad blocks
1232         # the Verifier *will* notice this, since it examines every byte
1233         d = self._fn.check(Monitor(), verify=True)
1234         d.addCallback(self.check_bad, "test_verify_one_bad_block")
1235         d.addCallback(self.check_expected_failure,
1236                       CorruptShareError, "block hash tree failure",
1237                       "test_verify_one_bad_block")
1238         return d
1239
1240     def test_verify_one_bad_sharehash(self):
1241         corrupt(None, self._storage, "share_hash_chain", [9], 5)
1242         d = self._fn.check(Monitor(), verify=True)
1243         d.addCallback(self.check_bad, "test_verify_one_bad_sharehash")
1244         d.addCallback(self.check_expected_failure,
1245                       CorruptShareError, "corrupt hashes",
1246                       "test_verify_one_bad_sharehash")
1247         return d
1248
1249     def test_verify_one_bad_encprivkey(self):
1250         corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
1251         d = self._fn.check(Monitor(), verify=True)
1252         d.addCallback(self.check_bad, "test_verify_one_bad_encprivkey")
1253         d.addCallback(self.check_expected_failure,
1254                       CorruptShareError, "invalid privkey",
1255                       "test_verify_one_bad_encprivkey")
1256         return d
1257
1258     def test_verify_one_bad_encprivkey_uncheckable(self):
1259         corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
1260         readonly_fn = self._fn.get_readonly()
1261         # a read-only node has no way to validate the privkey
1262         d = readonly_fn.check(Monitor(), verify=True)
1263         d.addCallback(self.check_good,
1264                       "test_verify_one_bad_encprivkey_uncheckable")
1265         return d
1266
1267 class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin):
1268
1269     def get_shares(self, s):
1270         all_shares = {} # maps (peerid, shnum) to share data
1271         for peerid in s._peers:
1272             shares = s._peers[peerid]
1273             for shnum in shares:
1274                 data = shares[shnum]
1275                 all_shares[ (peerid, shnum) ] = data
1276         return all_shares
1277
1278     def copy_shares(self, ignored=None):
1279         self.old_shares.append(self.get_shares(self._storage))
1280
1281     def test_repair_nop(self):
1282         self.old_shares = []
1283         d = self.publish_one()
1284         d.addCallback(self.copy_shares)
1285         d.addCallback(lambda res: self._fn.check(Monitor()))
1286         d.addCallback(lambda check_results: self._fn.repair(check_results))
1287         def _check_results(rres):
1288             self.failUnless(IRepairResults.providedBy(rres))
1289             # TODO: examine results
1290
1291             self.copy_shares()
1292
1293             initial_shares = self.old_shares[0]
1294             new_shares = self.old_shares[1]
1295             # TODO: this really shouldn't change anything. When we implement
1296             # a "minimal-bandwidth" repairer", change this test to assert:
1297             #self.failUnlessEqual(new_shares, initial_shares)
1298
1299             # all shares should be in the same place as before
1300             self.failUnlessEqual(set(initial_shares.keys()),
1301                                  set(new_shares.keys()))
1302             # but they should all be at a newer seqnum. The IV will be
1303             # different, so the roothash will be too.
1304             for key in initial_shares:
1305                 (version0,
1306                  seqnum0,
1307                  root_hash0,
1308                  IV0,
1309                  k0, N0, segsize0, datalen0,
1310                  o0) = unpack_header(initial_shares[key])
1311                 (version1,
1312                  seqnum1,
1313                  root_hash1,
1314                  IV1,
1315                  k1, N1, segsize1, datalen1,
1316                  o1) = unpack_header(new_shares[key])
1317                 self.failUnlessEqual(version0, version1)
1318                 self.failUnlessEqual(seqnum0+1, seqnum1)
1319                 self.failUnlessEqual(k0, k1)
1320                 self.failUnlessEqual(N0, N1)
1321                 self.failUnlessEqual(segsize0, segsize1)
1322                 self.failUnlessEqual(datalen0, datalen1)
1323         d.addCallback(_check_results)
1324         return d
1325
1326     def failIfSharesChanged(self, ignored=None):
1327         old_shares = self.old_shares[-2]
1328         current_shares = self.old_shares[-1]
1329         self.failUnlessEqual(old_shares, current_shares)
1330
1331     def test_merge(self):
1332         self.old_shares = []
1333         d = self.publish_multiple()
1334         # repair will refuse to merge multiple highest seqnums unless you
1335         # pass force=True
1336         d.addCallback(lambda res:
1337                       self._set_versions({0:3,2:3,4:3,6:3,8:3,
1338                                           1:4,3:4,5:4,7:4,9:4}))
1339         d.addCallback(self.copy_shares)
1340         d.addCallback(lambda res: self._fn.check(Monitor()))
1341         def _try_repair(check_results):
1342             ex = "There were multiple recoverable versions with identical seqnums, so force=True must be passed to the repair() operation"
1343             d2 = self.shouldFail(MustForceRepairError, "test_merge", ex,
1344                                  self._fn.repair, check_results)
1345             d2.addCallback(self.copy_shares)
1346             d2.addCallback(self.failIfSharesChanged)
1347             d2.addCallback(lambda res: check_results)
1348             return d2
1349         d.addCallback(_try_repair)
1350         d.addCallback(lambda check_results:
1351                       self._fn.repair(check_results, force=True))
1352         # this should give us 10 shares of the highest roothash
1353         def _check_repair_results(rres):
1354             pass # TODO
1355         d.addCallback(_check_repair_results)
1356         d.addCallback(lambda res: self._fn.get_servermap(MODE_CHECK))
1357         def _check_smap(smap):
1358             self.failUnlessEqual(len(smap.recoverable_versions()), 1)
1359             self.failIf(smap.unrecoverable_versions())
1360             # now, which should have won?
1361             roothash_s4a = self.get_roothash_for(3)
1362             roothash_s4b = self.get_roothash_for(4)
1363             if roothash_s4b > roothash_s4a:
1364                 expected_contents = self.CONTENTS[4]
1365             else:
1366                 expected_contents = self.CONTENTS[3]
1367             new_versionid = smap.best_recoverable_version()
1368             self.failUnlessEqual(new_versionid[0], 5) # seqnum 5
1369             d2 = self._fn.download_version(smap, new_versionid)
1370             d2.addCallback(self.failUnlessEqual, expected_contents)
1371             return d2
1372         d.addCallback(_check_smap)
1373         return d
1374
1375     def test_non_merge(self):
1376         self.old_shares = []
1377         d = self.publish_multiple()
1378         # repair should not refuse a repair that doesn't need to merge. In
1379         # this case, we combine v2 with v3. The repair should ignore v2 and
1380         # copy v3 into a new v5.
1381         d.addCallback(lambda res:
1382                       self._set_versions({0:2,2:2,4:2,6:2,8:2,
1383                                           1:3,3:3,5:3,7:3,9:3}))
1384         d.addCallback(lambda res: self._fn.check(Monitor()))
1385         d.addCallback(lambda check_results: self._fn.repair(check_results))
1386         # this should give us 10 shares of v3
1387         def _check_repair_results(rres):
1388             pass # TODO
1389         d.addCallback(_check_repair_results)
1390         d.addCallback(lambda res: self._fn.get_servermap(MODE_CHECK))
1391         def _check_smap(smap):
1392             self.failUnlessEqual(len(smap.recoverable_versions()), 1)
1393             self.failIf(smap.unrecoverable_versions())
1394             # now, which should have won?
1395             roothash_s4a = self.get_roothash_for(3)
1396             expected_contents = self.CONTENTS[3]
1397             new_versionid = smap.best_recoverable_version()
1398             self.failUnlessEqual(new_versionid[0], 5) # seqnum 5
1399             d2 = self._fn.download_version(smap, new_versionid)
1400             d2.addCallback(self.failUnlessEqual, expected_contents)
1401             return d2
1402         d.addCallback(_check_smap)
1403         return d
1404
1405     def get_roothash_for(self, index):
1406         # return the roothash for the first share we see in the saved set
1407         shares = self._copied_shares[index]
1408         for peerid in shares:
1409             for shnum in shares[peerid]:
1410                 share = shares[peerid][shnum]
1411                 (version, seqnum, root_hash, IV, k, N, segsize, datalen, o) = \
1412                           unpack_header(share)
1413                 return root_hash
1414
1415     def test_check_and_repair_readcap(self):
1416         # we can't currently repair from a mutable readcap: #625
1417         self.old_shares = []
1418         d = self.publish_one()
1419         d.addCallback(self.copy_shares)
1420         def _get_readcap(res):
1421             self._fn3 = self._fn.get_readonly()
1422             # also delete some shares
1423             for peerid,shares in self._storage._peers.items():
1424                 shares.pop(0, None)
1425         d.addCallback(_get_readcap)
1426         d.addCallback(lambda res: self._fn3.check_and_repair(Monitor()))
1427         def _check_results(crr):
1428             self.failUnless(ICheckAndRepairResults.providedBy(crr))
1429             # we should detect the unhealthy, but skip over mutable-readcap
1430             # repairs until #625 is fixed
1431             self.failIf(crr.get_pre_repair_results().is_healthy())
1432             self.failIf(crr.get_repair_attempted())
1433             self.failIf(crr.get_post_repair_results().is_healthy())
1434         d.addCallback(_check_results)
1435         return d
1436
1437 class DevNullDictionary(dict):
1438     def __setitem__(self, key, value):
1439         return
1440
1441 class MultipleEncodings(unittest.TestCase):
1442     def setUp(self):
1443         self.CONTENTS = "New contents go here"
1444         self._storage = FakeStorage()
1445         self._nodemaker = make_nodemaker(self._storage, num_peers=20)
1446         self._storage_broker = self._nodemaker.storage_broker
1447         d = self._nodemaker.create_mutable_file(self.CONTENTS)
1448         def _created(node):
1449             self._fn = node
1450         d.addCallback(_created)
1451         return d
1452
1453     def _encode(self, k, n, data):
1454         # encode 'data' into a peerid->shares dict.
1455
1456         fn = self._fn
1457         # disable the nodecache, since for these tests we explicitly need
1458         # multiple nodes pointing at the same file
1459         self._nodemaker._node_cache = DevNullDictionary()
1460         fn2 = self._nodemaker.create_from_cap(fn.get_uri())
1461         # then we copy over other fields that are normally fetched from the
1462         # existing shares
1463         fn2._pubkey = fn._pubkey
1464         fn2._privkey = fn._privkey
1465         fn2._encprivkey = fn._encprivkey
1466         # and set the encoding parameters to something completely different
1467         fn2._required_shares = k
1468         fn2._total_shares = n
1469
1470         s = self._storage
1471         s._peers = {} # clear existing storage
1472         p2 = Publish(fn2, self._storage_broker, None)
1473         d = p2.publish(data)
1474         def _published(res):
1475             shares = s._peers
1476             s._peers = {}
1477             return shares
1478         d.addCallback(_published)
1479         return d
1480
1481     def make_servermap(self, mode=MODE_READ, oldmap=None):
1482         if oldmap is None:
1483             oldmap = ServerMap()
1484         smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(),
1485                                oldmap, mode)
1486         d = smu.update()
1487         return d
1488
1489     def test_multiple_encodings(self):
1490         # we encode the same file in two different ways (3-of-10 and 4-of-9),
1491         # then mix up the shares, to make sure that download survives seeing
1492         # a variety of encodings. This is actually kind of tricky to set up.
1493
1494         contents1 = "Contents for encoding 1 (3-of-10) go here"
1495         contents2 = "Contents for encoding 2 (4-of-9) go here"
1496         contents3 = "Contents for encoding 3 (4-of-7) go here"
1497
1498         # we make a retrieval object that doesn't know what encoding
1499         # parameters to use
1500         fn3 = self._nodemaker.create_from_cap(self._fn.get_uri())
1501
1502         # now we upload a file through fn1, and grab its shares
1503         d = self._encode(3, 10, contents1)
1504         def _encoded_1(shares):
1505             self._shares1 = shares
1506         d.addCallback(_encoded_1)
1507         d.addCallback(lambda res: self._encode(4, 9, contents2))
1508         def _encoded_2(shares):
1509             self._shares2 = shares
1510         d.addCallback(_encoded_2)
1511         d.addCallback(lambda res: self._encode(4, 7, contents3))
1512         def _encoded_3(shares):
1513             self._shares3 = shares
1514         d.addCallback(_encoded_3)
1515
1516         def _merge(res):
1517             log.msg("merging sharelists")
1518             # we merge the shares from the two sets, leaving each shnum in
1519             # its original location, but using a share from set1 or set2
1520             # according to the following sequence:
1521             #
1522             #  4-of-9  a  s2
1523             #  4-of-9  b  s2
1524             #  4-of-7  c   s3
1525             #  4-of-9  d  s2
1526             #  3-of-9  e s1
1527             #  3-of-9  f s1
1528             #  3-of-9  g s1
1529             #  4-of-9  h  s2
1530             #
1531             # so that neither form can be recovered until fetch [f], at which
1532             # point version-s1 (the 3-of-10 form) should be recoverable. If
1533             # the implementation latches on to the first version it sees,
1534             # then s2 will be recoverable at fetch [g].
1535
1536             # Later, when we implement code that handles multiple versions,
1537             # we can use this framework to assert that all recoverable
1538             # versions are retrieved, and test that 'epsilon' does its job
1539
1540             places = [2, 2, 3, 2, 1, 1, 1, 2]
1541
1542             sharemap = {}
1543             sb = self._storage_broker
1544
1545             for peerid in sorted(sb.get_all_serverids()):
1546                 peerid_s = shortnodeid_b2a(peerid)
1547                 for shnum in self._shares1.get(peerid, {}):
1548                     if shnum < len(places):
1549                         which = places[shnum]
1550                     else:
1551                         which = "x"
1552                     self._storage._peers[peerid] = peers = {}
1553                     in_1 = shnum in self._shares1[peerid]
1554                     in_2 = shnum in self._shares2.get(peerid, {})
1555                     in_3 = shnum in self._shares3.get(peerid, {})
1556                     #print peerid_s, shnum, which, in_1, in_2, in_3
1557                     if which == 1:
1558                         if in_1:
1559                             peers[shnum] = self._shares1[peerid][shnum]
1560                             sharemap[shnum] = peerid
1561                     elif which == 2:
1562                         if in_2:
1563                             peers[shnum] = self._shares2[peerid][shnum]
1564                             sharemap[shnum] = peerid
1565                     elif which == 3:
1566                         if in_3:
1567                             peers[shnum] = self._shares3[peerid][shnum]
1568                             sharemap[shnum] = peerid
1569
1570             # we don't bother placing any other shares
1571             # now sort the sequence so that share 0 is returned first
1572             new_sequence = [sharemap[shnum]
1573                             for shnum in sorted(sharemap.keys())]
1574             self._storage._sequence = new_sequence
1575             log.msg("merge done")
1576         d.addCallback(_merge)
1577         d.addCallback(lambda res: fn3.download_best_version())
1578         def _retrieved(new_contents):
1579             # the current specified behavior is "first version recoverable"
1580             self.failUnlessEqual(new_contents, contents1)
1581         d.addCallback(_retrieved)
1582         return d
1583
1584
1585 class MultipleVersions(unittest.TestCase, PublishMixin, CheckerMixin):
1586
1587     def setUp(self):
1588         return self.publish_multiple()
1589
1590     def test_multiple_versions(self):
1591         # if we see a mix of versions in the grid, download_best_version
1592         # should get the latest one
1593         self._set_versions(dict([(i,2) for i in (0,2,4,6,8)]))
1594         d = self._fn.download_best_version()
1595         d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[4]))
1596         # and the checker should report problems
1597         d.addCallback(lambda res: self._fn.check(Monitor()))
1598         d.addCallback(self.check_bad, "test_multiple_versions")
1599
1600         # but if everything is at version 2, that's what we should download
1601         d.addCallback(lambda res:
1602                       self._set_versions(dict([(i,2) for i in range(10)])))
1603         d.addCallback(lambda res: self._fn.download_best_version())
1604         d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2]))
1605         # if exactly one share is at version 3, we should still get v2
1606         d.addCallback(lambda res:
1607                       self._set_versions({0:3}))
1608         d.addCallback(lambda res: self._fn.download_best_version())
1609         d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2]))
1610         # but the servermap should see the unrecoverable version. This
1611         # depends upon the single newer share being queried early.
1612         d.addCallback(lambda res: self._fn.get_servermap(MODE_READ))
1613         def _check_smap(smap):
1614             self.failUnlessEqual(len(smap.unrecoverable_versions()), 1)
1615             newer = smap.unrecoverable_newer_versions()
1616             self.failUnlessEqual(len(newer), 1)
1617             verinfo, health = newer.items()[0]
1618             self.failUnlessEqual(verinfo[0], 4)
1619             self.failUnlessEqual(health, (1,3))
1620             self.failIf(smap.needs_merge())
1621         d.addCallback(_check_smap)
1622         # if we have a mix of two parallel versions (s4a and s4b), we could
1623         # recover either
1624         d.addCallback(lambda res:
1625                       self._set_versions({0:3,2:3,4:3,6:3,8:3,
1626                                           1:4,3:4,5:4,7:4,9:4}))
1627         d.addCallback(lambda res: self._fn.get_servermap(MODE_READ))
1628         def _check_smap_mixed(smap):
1629             self.failUnlessEqual(len(smap.unrecoverable_versions()), 0)
1630             newer = smap.unrecoverable_newer_versions()
1631             self.failUnlessEqual(len(newer), 0)
1632             self.failUnless(smap.needs_merge())
1633         d.addCallback(_check_smap_mixed)
1634         d.addCallback(lambda res: self._fn.download_best_version())
1635         d.addCallback(lambda res: self.failUnless(res == self.CONTENTS[3] or
1636                                                   res == self.CONTENTS[4]))
1637         return d
1638
1639     def test_replace(self):
1640         # if we see a mix of versions in the grid, we should be able to
1641         # replace them all with a newer version
1642
1643         # if exactly one share is at version 3, we should download (and
1644         # replace) v2, and the result should be v4. Note that the index we
1645         # give to _set_versions is different than the sequence number.
1646         target = dict([(i,2) for i in range(10)]) # seqnum3
1647         target[0] = 3 # seqnum4
1648         self._set_versions(target)
1649
1650         def _modify(oldversion, servermap, first_time):
1651             return oldversion + " modified"
1652         d = self._fn.modify(_modify)
1653         d.addCallback(lambda res: self._fn.download_best_version())
1654         expected = self.CONTENTS[2] + " modified"
1655         d.addCallback(lambda res: self.failUnlessEqual(res, expected))
1656         # and the servermap should indicate that the outlier was replaced too
1657         d.addCallback(lambda res: self._fn.get_servermap(MODE_CHECK))
1658         def _check_smap(smap):
1659             self.failUnlessEqual(smap.highest_seqnum(), 5)
1660             self.failUnlessEqual(len(smap.unrecoverable_versions()), 0)
1661             self.failUnlessEqual(len(smap.recoverable_versions()), 1)
1662         d.addCallback(_check_smap)
1663         return d
1664
1665
1666 class Utils(unittest.TestCase):
1667     def _do_inside(self, c, x_start, x_length, y_start, y_length):
1668         # we compare this against sets of integers
1669         x = set(range(x_start, x_start+x_length))
1670         y = set(range(y_start, y_start+y_length))
1671         should_be_inside = x.issubset(y)
1672         self.failUnlessEqual(should_be_inside, c._inside(x_start, x_length,
1673                                                          y_start, y_length),
1674                              str((x_start, x_length, y_start, y_length)))
1675
1676     def test_cache_inside(self):
1677         c = ResponseCache()
1678         x_start = 10
1679         x_length = 5
1680         for y_start in range(8, 17):
1681             for y_length in range(8):
1682                 self._do_inside(c, x_start, x_length, y_start, y_length)
1683
1684     def _do_overlap(self, c, x_start, x_length, y_start, y_length):
1685         # we compare this against sets of integers
1686         x = set(range(x_start, x_start+x_length))
1687         y = set(range(y_start, y_start+y_length))
1688         overlap = bool(x.intersection(y))
1689         self.failUnlessEqual(overlap, c._does_overlap(x_start, x_length,
1690                                                       y_start, y_length),
1691                              str((x_start, x_length, y_start, y_length)))
1692
1693     def test_cache_overlap(self):
1694         c = ResponseCache()
1695         x_start = 10
1696         x_length = 5
1697         for y_start in range(8, 17):
1698             for y_length in range(8):
1699                 self._do_overlap(c, x_start, x_length, y_start, y_length)
1700
1701     def test_cache(self):
1702         c = ResponseCache()
1703         # xdata = base62.b2a(os.urandom(100))[:100]
1704         xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
1705         ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
1706         nope = (None, None)
1707         c.add("v1", 1, 0, xdata, "time0")
1708         c.add("v1", 1, 2000, ydata, "time1")
1709         self.failUnlessEqual(c.read("v2", 1, 10, 11), nope)
1710         self.failUnlessEqual(c.read("v1", 2, 10, 11), nope)
1711         self.failUnlessEqual(c.read("v1", 1, 0, 10), (xdata[:10], "time0"))
1712         self.failUnlessEqual(c.read("v1", 1, 90, 10), (xdata[90:], "time0"))
1713         self.failUnlessEqual(c.read("v1", 1, 300, 10), nope)
1714         self.failUnlessEqual(c.read("v1", 1, 2050, 5), (ydata[50:55], "time1"))
1715         self.failUnlessEqual(c.read("v1", 1, 0, 101), nope)
1716         self.failUnlessEqual(c.read("v1", 1, 99, 1), (xdata[99:100], "time0"))
1717         self.failUnlessEqual(c.read("v1", 1, 100, 1), nope)
1718         self.failUnlessEqual(c.read("v1", 1, 1990, 9), nope)
1719         self.failUnlessEqual(c.read("v1", 1, 1990, 10), nope)
1720         self.failUnlessEqual(c.read("v1", 1, 1990, 11), nope)
1721         self.failUnlessEqual(c.read("v1", 1, 1990, 15), nope)
1722         self.failUnlessEqual(c.read("v1", 1, 1990, 19), nope)
1723         self.failUnlessEqual(c.read("v1", 1, 1990, 20), nope)
1724         self.failUnlessEqual(c.read("v1", 1, 1990, 21), nope)
1725         self.failUnlessEqual(c.read("v1", 1, 1990, 25), nope)
1726         self.failUnlessEqual(c.read("v1", 1, 1999, 25), nope)
1727
1728         # optional: join fragments
1729         c = ResponseCache()
1730         c.add("v1", 1, 0, xdata[:10], "time0")
1731         c.add("v1", 1, 10, xdata[10:20], "time1")
1732         #self.failUnlessEqual(c.read("v1", 1, 0, 20), (xdata[:20], "time0"))
1733
1734 class Exceptions(unittest.TestCase):
1735     def test_repr(self):
1736         nmde = NeedMoreDataError(100, 50, 100)
1737         self.failUnless("NeedMoreDataError" in repr(nmde), repr(nmde))
1738         ucwe = UncoordinatedWriteError()
1739         self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
1740
1741 class SameKeyGenerator:
1742     def __init__(self, pubkey, privkey):
1743         self.pubkey = pubkey
1744         self.privkey = privkey
1745     def generate(self, keysize=None):
1746         return defer.succeed( (self.pubkey, self.privkey) )
1747
1748 class FirstServerGetsKilled:
1749     done = False
1750     def notify(self, retval, wrapper, methname):
1751         if not self.done:
1752             wrapper.broken = True
1753             self.done = True
1754         return retval
1755
1756 class FirstServerGetsDeleted:
1757     def __init__(self):
1758         self.done = False
1759         self.silenced = None
1760     def notify(self, retval, wrapper, methname):
1761         if not self.done:
1762             # this query will work, but later queries should think the share
1763             # has been deleted
1764             self.done = True
1765             self.silenced = wrapper
1766             return retval
1767         if wrapper == self.silenced:
1768             assert methname == "slot_testv_and_readv_and_writev"
1769             return (True, {})
1770         return retval
1771
1772 class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
1773     def test_publish_surprise(self):
1774         self.basedir = "mutable/Problems/test_publish_surprise"
1775         self.set_up_grid()
1776         nm = self.g.clients[0].nodemaker
1777         d = nm.create_mutable_file("contents 1")
1778         def _created(n):
1779             d = defer.succeed(None)
1780             d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
1781             def _got_smap1(smap):
1782                 # stash the old state of the file
1783                 self.old_map = smap
1784             d.addCallback(_got_smap1)
1785             # then modify the file, leaving the old map untouched
1786             d.addCallback(lambda res: log.msg("starting winning write"))
1787             d.addCallback(lambda res: n.overwrite("contents 2"))
1788             # now attempt to modify the file with the old servermap. This
1789             # will look just like an uncoordinated write, in which every
1790             # single share got updated between our mapupdate and our publish
1791             d.addCallback(lambda res: log.msg("starting doomed write"))
1792             d.addCallback(lambda res:
1793                           self.shouldFail(UncoordinatedWriteError,
1794                                           "test_publish_surprise", None,
1795                                           n.upload,
1796                                           "contents 2a", self.old_map))
1797             return d
1798         d.addCallback(_created)
1799         return d
1800
1801     def test_retrieve_surprise(self):
1802         self.basedir = "mutable/Problems/test_retrieve_surprise"
1803         self.set_up_grid()
1804         nm = self.g.clients[0].nodemaker
1805         d = nm.create_mutable_file("contents 1")
1806         def _created(n):
1807             d = defer.succeed(None)
1808             d.addCallback(lambda res: n.get_servermap(MODE_READ))
1809             def _got_smap1(smap):
1810                 # stash the old state of the file
1811                 self.old_map = smap
1812             d.addCallback(_got_smap1)
1813             # then modify the file, leaving the old map untouched
1814             d.addCallback(lambda res: log.msg("starting winning write"))
1815             d.addCallback(lambda res: n.overwrite("contents 2"))
1816             # now attempt to retrieve the old version with the old servermap.
1817             # This will look like someone has changed the file since we
1818             # updated the servermap.
1819             d.addCallback(lambda res: n._cache._clear())
1820             d.addCallback(lambda res: log.msg("starting doomed read"))
1821             d.addCallback(lambda res:
1822                           self.shouldFail(NotEnoughSharesError,
1823                                           "test_retrieve_surprise",
1824                                           "ran out of peers: have 0 shares (k=3)",
1825                                           n.download_version,
1826                                           self.old_map,
1827                                           self.old_map.best_recoverable_version(),
1828                                           ))
1829             return d
1830         d.addCallback(_created)
1831         return d
1832
1833     def test_unexpected_shares(self):
1834         # upload the file, take a servermap, shut down one of the servers,
1835         # upload it again (causing shares to appear on a new server), then
1836         # upload using the old servermap. The last upload should fail with an
1837         # UncoordinatedWriteError, because of the shares that didn't appear
1838         # in the servermap.
1839         self.basedir = "mutable/Problems/test_unexpected_shares"
1840         self.set_up_grid()
1841         nm = self.g.clients[0].nodemaker
1842         d = nm.create_mutable_file("contents 1")
1843         def _created(n):
1844             d = defer.succeed(None)
1845             d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
1846             def _got_smap1(smap):
1847                 # stash the old state of the file
1848                 self.old_map = smap
1849                 # now shut down one of the servers
1850                 peer0 = list(smap.make_sharemap()[0])[0]
1851                 self.g.remove_server(peer0)
1852                 # then modify the file, leaving the old map untouched
1853                 log.msg("starting winning write")
1854                 return n.overwrite("contents 2")
1855             d.addCallback(_got_smap1)
1856             # now attempt to modify the file with the old servermap. This
1857             # will look just like an uncoordinated write, in which every
1858             # single share got updated between our mapupdate and our publish
1859             d.addCallback(lambda res: log.msg("starting doomed write"))
1860             d.addCallback(lambda res:
1861                           self.shouldFail(UncoordinatedWriteError,
1862                                           "test_surprise", None,
1863                                           n.upload,
1864                                           "contents 2a", self.old_map))
1865             return d
1866         d.addCallback(_created)
1867         return d
1868
1869     def test_bad_server(self):
1870         # Break one server, then create the file: the initial publish should
1871         # complete with an alternate server. Breaking a second server should
1872         # not prevent an update from succeeding either.
1873         self.basedir = "mutable/Problems/test_bad_server"
1874         self.set_up_grid()
1875         nm = self.g.clients[0].nodemaker
1876
1877         # to make sure that one of the initial peers is broken, we have to
1878         # get creative. We create an RSA key and compute its storage-index.
1879         # Then we make a KeyGenerator that always returns that one key, and
1880         # use it to create the mutable file. This will get easier when we can
1881         # use #467 static-server-selection to disable permutation and force
1882         # the choice of server for share[0].
1883
1884         d = nm.key_generator.generate(522)
1885         def _got_key( (pubkey, privkey) ):
1886             nm.key_generator = SameKeyGenerator(pubkey, privkey)
1887             pubkey_s = pubkey.serialize()
1888             privkey_s = privkey.serialize()
1889             u = uri.WriteableSSKFileURI(ssk_writekey_hash(privkey_s),
1890                                         ssk_pubkey_fingerprint_hash(pubkey_s))
1891             self._storage_index = u.storage_index
1892         d.addCallback(_got_key)
1893         def _break_peer0(res):
1894             si = self._storage_index
1895             peerlist = nm.storage_broker.get_servers_for_index(si)
1896             peerid0, connection0 = peerlist[0]
1897             peerid1, connection1 = peerlist[1]
1898             connection0.broken = True
1899             self.connection1 = connection1
1900         d.addCallback(_break_peer0)
1901         # now "create" the file, using the pre-established key, and let the
1902         # initial publish finally happen
1903         d.addCallback(lambda res: nm.create_mutable_file("contents 1"))
1904         # that ought to work
1905         def _got_node(n):
1906             d = n.download_best_version()
1907             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
1908             # now break the second peer
1909             def _break_peer1(res):
1910                 self.connection1.broken = True
1911             d.addCallback(_break_peer1)
1912             d.addCallback(lambda res: n.overwrite("contents 2"))
1913             # that ought to work too
1914             d.addCallback(lambda res: n.download_best_version())
1915             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
1916             def _explain_error(f):
1917                 print f
1918                 if f.check(NotEnoughServersError):
1919                     print "first_error:", f.value.first_error
1920                 return f
1921             d.addErrback(_explain_error)
1922             return d
1923         d.addCallback(_got_node)
1924         return d
1925
1926     def test_bad_server_overlap(self):
1927         # like test_bad_server, but with no extra unused servers to fall back
1928         # upon. This means that we must re-use a server which we've already
1929         # used. If we don't remember the fact that we sent them one share
1930         # already, we'll mistakenly think we're experiencing an
1931         # UncoordinatedWriteError.
1932
1933         # Break one server, then create the file: the initial publish should
1934         # complete with an alternate server. Breaking a second server should
1935         # not prevent an update from succeeding either.
1936         self.basedir = "mutable/Problems/test_bad_server_overlap"
1937         self.set_up_grid()
1938         nm = self.g.clients[0].nodemaker
1939         sb = nm.storage_broker
1940
1941         peerids = [serverid for (serverid,ss) in sb.get_all_servers()]
1942         self.g.break_server(peerids[0])
1943
1944         d = nm.create_mutable_file("contents 1")
1945         def _created(n):
1946             d = n.download_best_version()
1947             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
1948             # now break one of the remaining servers
1949             def _break_second_server(res):
1950                 self.g.break_server(peerids[1])
1951             d.addCallback(_break_second_server)
1952             d.addCallback(lambda res: n.overwrite("contents 2"))
1953             # that ought to work too
1954             d.addCallback(lambda res: n.download_best_version())
1955             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
1956             return d
1957         d.addCallback(_created)
1958         return d
1959
1960     def test_publish_all_servers_bad(self):
1961         # Break all servers: the publish should fail
1962         self.basedir = "mutable/Problems/test_publish_all_servers_bad"
1963         self.set_up_grid()
1964         nm = self.g.clients[0].nodemaker
1965         for (serverid,ss) in nm.storage_broker.get_all_servers():
1966             ss.broken = True
1967
1968         d = self.shouldFail(NotEnoughServersError,
1969                             "test_publish_all_servers_bad",
1970                             "Ran out of non-bad servers",
1971                             nm.create_mutable_file, "contents")
1972         return d
1973
1974     def test_publish_no_servers(self):
1975         # no servers at all: the publish should fail
1976         self.basedir = "mutable/Problems/test_publish_no_servers"
1977         self.set_up_grid(num_servers=0)
1978         nm = self.g.clients[0].nodemaker
1979
1980         d = self.shouldFail(NotEnoughServersError,
1981                             "test_publish_no_servers",
1982                             "Ran out of non-bad servers",
1983                             nm.create_mutable_file, "contents")
1984         return d
1985     test_publish_no_servers.timeout = 30
1986
1987
1988     def test_privkey_query_error(self):
1989         # when a servermap is updated with MODE_WRITE, it tries to get the
1990         # privkey. Something might go wrong during this query attempt.
1991         # Exercise the code in _privkey_query_failed which tries to handle
1992         # such an error.
1993         self.basedir = "mutable/Problems/test_privkey_query_error"
1994         self.set_up_grid(num_servers=20)
1995         nm = self.g.clients[0].nodemaker
1996         nm._node_cache = DevNullDictionary() # disable the nodecache
1997
1998         # we need some contents that are large enough to push the privkey out
1999         # of the early part of the file
2000         LARGE = "These are Larger contents" * 2000 # about 50KB
2001         d = nm.create_mutable_file(LARGE)
2002         def _created(n):
2003             self.uri = n.get_uri()
2004             self.n2 = nm.create_from_cap(self.uri)
2005
2006             # When a mapupdate is performed on a node that doesn't yet know
2007             # the privkey, a short read is sent to a batch of servers, to get
2008             # the verinfo and (hopefully, if the file is short enough) the
2009             # encprivkey. Our file is too large to let this first read
2010             # contain the encprivkey. Each non-encprivkey-bearing response
2011             # that arrives (until the node gets the encprivkey) will trigger
2012             # a second read to specifically read the encprivkey.
2013             #
2014             # So, to exercise this case:
2015             #  1. notice which server gets a read() call first
2016             #  2. tell that server to start throwing errors
2017             killer = FirstServerGetsKilled()
2018             for (serverid,ss) in nm.storage_broker.get_all_servers():
2019                 ss.post_call_notifier = killer.notify
2020         d.addCallback(_created)
2021
2022         # now we update a servermap from a new node (which doesn't have the
2023         # privkey yet, forcing it to use a separate privkey query). Note that
2024         # the map-update will succeed, since we'll just get a copy from one
2025         # of the other shares.
2026         d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
2027
2028         return d
2029
2030     def test_privkey_query_missing(self):
2031         # like test_privkey_query_error, but the shares are deleted by the
2032         # second query, instead of raising an exception.
2033         self.basedir = "mutable/Problems/test_privkey_query_missing"
2034         self.set_up_grid(num_servers=20)
2035         nm = self.g.clients[0].nodemaker
2036         LARGE = "These are Larger contents" * 2000 # about 50KB
2037         nm._node_cache = DevNullDictionary() # disable the nodecache
2038
2039         d = nm.create_mutable_file(LARGE)
2040         def _created(n):
2041             self.uri = n.get_uri()
2042             self.n2 = nm.create_from_cap(self.uri)
2043             deleter = FirstServerGetsDeleted()
2044             for (serverid,ss) in nm.storage_broker.get_all_servers():
2045                 ss.post_call_notifier = deleter.notify
2046         d.addCallback(_created)
2047         d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
2048         return d