]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_mutable.py
mutable WIP: add servermap update status pages
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_mutable.py
1
2 import struct
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer, reactor
6 from twisted.python import failure
7 from allmydata import uri, download
8 from allmydata.util import base32
9 from allmydata.util.idlib import shortnodeid_b2a
10 from allmydata.util.hashutil import tagged_hash
11 from allmydata.encode import NotEnoughSharesError
12 from allmydata.interfaces import IURI, IMutableFileURI, IUploadable
13 from foolscap.eventual import eventually, fireEventually
14 from foolscap.logging import log
15 import sha
16
17 from allmydata.mutable.node import MutableFileNode
18 from allmydata.mutable.common import DictOfSets, ResponseCache, \
19      MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, UnrecoverableFileError
20 from allmydata.mutable.retrieve import Retrieve
21 from allmydata.mutable.publish import Publish
22 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
23 from allmydata.mutable.layout import unpack_header, unpack_share
24
25 # this "FastMutableFileNode" exists solely to speed up tests by using smaller
26 # public/private keys. Once we switch to fast DSA-based keys, we can get rid
27 # of this.
28
29 class FastMutableFileNode(MutableFileNode):
30     SIGNATURE_KEY_SIZE = 522
31
32 # this "FakeStorage" exists to put the share data in RAM and avoid using real
33 # network connections, both to speed up the tests and to reduce the amount of
34 # non-mutable.py code being exercised.
35
36 class FakeStorage:
37     # this class replaces the collection of storage servers, allowing the
38     # tests to examine and manipulate the published shares. It also lets us
39     # control the order in which read queries are answered, to exercise more
40     # of the error-handling code in Retrieve .
41     #
42     # Note that we ignore the storage index: this FakeStorage instance can
43     # only be used for a single storage index.
44
45
46     def __init__(self):
47         self._peers = {}
48         # _sequence is used to cause the responses to occur in a specific
49         # order. If it is in use, then we will defer queries instead of
50         # answering them right away, accumulating the Deferreds in a dict. We
51         # don't know exactly how many queries we'll get, so exactly one
52         # second after the first query arrives, we will release them all (in
53         # order).
54         self._sequence = None
55         self._pending = {}
56
57     def read(self, peerid, storage_index):
58         shares = self._peers.get(peerid, {})
59         if self._sequence is None:
60             return defer.succeed(shares)
61         d = defer.Deferred()
62         if not self._pending:
63             reactor.callLater(1.0, self._fire_readers)
64         self._pending[peerid] = (d, shares)
65         return d
66
67     def _fire_readers(self):
68         pending = self._pending
69         self._pending = {}
70         extra = []
71         for peerid in self._sequence:
72             if peerid in pending:
73                 d, shares = pending.pop(peerid)
74                 eventually(d.callback, shares)
75         for (d, shares) in pending.values():
76             eventually(d.callback, shares)
77
78     def write(self, peerid, storage_index, shnum, offset, data):
79         if peerid not in self._peers:
80             self._peers[peerid] = {}
81         shares = self._peers[peerid]
82         f = StringIO()
83         f.write(shares.get(shnum, ""))
84         f.seek(offset)
85         f.write(data)
86         shares[shnum] = f.getvalue()
87
88
89 class FakeStorageServer:
90     def __init__(self, peerid, storage):
91         self.peerid = peerid
92         self.storage = storage
93         self.queries = 0
94     def callRemote(self, methname, *args, **kwargs):
95         def _call():
96             meth = getattr(self, methname)
97             return meth(*args, **kwargs)
98         d = fireEventually()
99         d.addCallback(lambda res: _call())
100         return d
101
102     def slot_readv(self, storage_index, shnums, readv):
103         d = self.storage.read(self.peerid, storage_index)
104         def _read(shares):
105             response = {}
106             for shnum in shares:
107                 if shnums and shnum not in shnums:
108                     continue
109                 vector = response[shnum] = []
110                 for (offset, length) in readv:
111                     assert isinstance(offset, (int, long)), offset
112                     assert isinstance(length, (int, long)), length
113                     vector.append(shares[shnum][offset:offset+length])
114             return response
115         d.addCallback(_read)
116         return d
117
118     def slot_testv_and_readv_and_writev(self, storage_index, secrets,
119                                         tw_vectors, read_vector):
120         # always-pass: parrot the test vectors back to them.
121         readv = {}
122         for shnum, (testv, writev, new_length) in tw_vectors.items():
123             for (offset, length, op, specimen) in testv:
124                 assert op in ("le", "eq", "ge")
125             # TODO: this isn't right, the read is controlled by read_vector,
126             # not by testv
127             readv[shnum] = [ specimen
128                              for (offset, length, op, specimen)
129                              in testv ]
130             for (offset, data) in writev:
131                 self.storage.write(self.peerid, storage_index, shnum,
132                                    offset, data)
133         answer = (True, readv)
134         return fireEventually(answer)
135
136
137 # our "FakeClient" has just enough functionality of the real Client to let
138 # the tests run.
139
140 class FakeClient:
141     mutable_file_node_class = FastMutableFileNode
142
143     def __init__(self, num_peers=10):
144         self._storage = FakeStorage()
145         self._num_peers = num_peers
146         self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
147                          for i in range(self._num_peers)]
148         self._connections = dict([(peerid, FakeStorageServer(peerid,
149                                                              self._storage))
150                                   for peerid in self._peerids])
151         self.nodeid = "fakenodeid"
152
153     def log(self, msg, **kw):
154         return log.msg(msg, **kw)
155
156     def get_renewal_secret(self):
157         return "I hereby permit you to renew my files"
158     def get_cancel_secret(self):
159         return "I hereby permit you to cancel my leases"
160
161     def create_mutable_file(self, contents=""):
162         n = self.mutable_file_node_class(self)
163         d = n.create(contents)
164         d.addCallback(lambda res: n)
165         return d
166
167     def notify_retrieve(self, r):
168         pass
169     def notify_publish(self, p):
170         pass
171     def notify_mapupdate(self, u):
172         pass
173
174     def create_node_from_uri(self, u):
175         u = IURI(u)
176         assert IMutableFileURI.providedBy(u), u
177         res = self.mutable_file_node_class(self).init_from_uri(u)
178         return res
179
180     def get_permuted_peers(self, service_name, key):
181         """
182         @return: list of (peerid, connection,)
183         """
184         results = []
185         for (peerid, connection) in self._connections.items():
186             assert isinstance(peerid, str)
187             permuted = sha.new(key + peerid).digest()
188             results.append((permuted, peerid, connection))
189         results.sort()
190         results = [ (r[1],r[2]) for r in results]
191         return results
192
193     def upload(self, uploadable):
194         assert IUploadable.providedBy(uploadable)
195         d = uploadable.get_size()
196         d.addCallback(lambda length: uploadable.read(length))
197         #d.addCallback(self.create_mutable_file)
198         def _got_data(datav):
199             data = "".join(datav)
200             #newnode = FastMutableFileNode(self)
201             return uri.LiteralFileURI(data)
202         d.addCallback(_got_data)
203         return d
204
205
206 def flip_bit(original, byte_offset):
207     return (original[:byte_offset] +
208             chr(ord(original[byte_offset]) ^ 0x01) +
209             original[byte_offset+1:])
210
211 def corrupt(res, s, offset, shnums_to_corrupt=None):
212     # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
213     # list of shnums to corrupt.
214     for peerid in s._peers:
215         shares = s._peers[peerid]
216         for shnum in shares:
217             if (shnums_to_corrupt is not None
218                 and shnum not in shnums_to_corrupt):
219                 continue
220             data = shares[shnum]
221             (version,
222              seqnum,
223              root_hash,
224              IV,
225              k, N, segsize, datalen,
226              o) = unpack_header(data)
227             if isinstance(offset, tuple):
228                 offset1, offset2 = offset
229             else:
230                 offset1 = offset
231                 offset2 = 0
232             if offset1 == "pubkey":
233                 real_offset = 107
234             elif offset1 in o:
235                 real_offset = o[offset1]
236             else:
237                 real_offset = offset1
238             real_offset = int(real_offset) + offset2
239             assert isinstance(real_offset, int), offset
240             shares[shnum] = flip_bit(data, real_offset)
241     return res
242
243 class Filenode(unittest.TestCase):
244     def setUp(self):
245         self.client = FakeClient()
246
247     def test_create(self):
248         d = self.client.create_mutable_file()
249         def _created(n):
250             self.failUnless(isinstance(n, FastMutableFileNode))
251             peer0 = self.client._peerids[0]
252             shnums = self.client._storage._peers[peer0].keys()
253             self.failUnlessEqual(len(shnums), 1)
254         d.addCallback(_created)
255         return d
256
257     def test_upload_and_download(self):
258         d = self.client.create_mutable_file()
259         def _created(n):
260             d = defer.succeed(None)
261             d.addCallback(lambda res: n.update_servermap())
262             d.addCallback(lambda smap: smap.dump(StringIO()))
263             d.addCallback(lambda sio:
264                           self.failUnless("3-of-10" in sio.getvalue()))
265             d.addCallback(lambda res: n.overwrite("contents 1"))
266             d.addCallback(lambda res: self.failUnlessIdentical(res, None))
267             d.addCallback(lambda res: n.download_to_data())
268             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
269             d.addCallback(lambda res: n.overwrite("contents 2"))
270             d.addCallback(lambda res: n.download_to_data())
271             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
272             d.addCallback(lambda res: n.download(download.Data()))
273             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
274             d.addCallback(lambda res: n.update("contents 3"))
275             d.addCallback(lambda res: n.download_to_data())
276             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
277             return d
278         d.addCallback(_created)
279         return d
280
281     def test_create_with_initial_contents(self):
282         d = self.client.create_mutable_file("contents 1")
283         def _created(n):
284             d = n.download_to_data()
285             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
286             d.addCallback(lambda res: n.overwrite("contents 2"))
287             d.addCallback(lambda res: n.download_to_data())
288             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
289             return d
290         d.addCallback(_created)
291         return d
292
293     def test_upload_and_download_full_size_keys(self):
294         self.client.mutable_file_node_class = MutableFileNode
295         d = self.client.create_mutable_file()
296         def _created(n):
297             d = defer.succeed(None)
298             d.addCallback(lambda res: n.update_servermap())
299             d.addCallback(lambda smap: smap.dump(StringIO()))
300             d.addCallback(lambda sio:
301                           self.failUnless("3-of-10" in sio.getvalue()))
302             d.addCallback(lambda res: n.overwrite("contents 1"))
303             d.addCallback(lambda res: self.failUnlessIdentical(res, None))
304             d.addCallback(lambda res: n.download_to_data())
305             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
306             d.addCallback(lambda res: n.overwrite("contents 2"))
307             d.addCallback(lambda res: n.download_to_data())
308             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
309             d.addCallback(lambda res: n.download(download.Data()))
310             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
311             d.addCallback(lambda res: n.update("contents 3"))
312             d.addCallback(lambda res: n.download_to_data())
313             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
314             return d
315         d.addCallback(_created)
316         return d
317
318
319 class MakeShares(unittest.TestCase):
320     def test_encrypt(self):
321         c = FakeClient()
322         fn = FastMutableFileNode(c)
323         CONTENTS = "some initial contents"
324         d = fn.create(CONTENTS)
325         def _created(res):
326             p = Publish(fn, None)
327             p.salt = "SALT" * 4
328             p.readkey = "\x00" * 16
329             p.newdata = CONTENTS
330             p.required_shares = 3
331             p.total_shares = 10
332             p.setup_encoding_parameters()
333             return p._encrypt_and_encode()
334         d.addCallback(_created)
335         def _done(shares_and_shareids):
336             (shares, share_ids) = shares_and_shareids
337             self.failUnlessEqual(len(shares), 10)
338             for sh in shares:
339                 self.failUnless(isinstance(sh, str))
340                 self.failUnlessEqual(len(sh), 7)
341             self.failUnlessEqual(len(share_ids), 10)
342         d.addCallback(_done)
343         return d
344
345     def test_generate(self):
346         c = FakeClient()
347         fn = FastMutableFileNode(c)
348         CONTENTS = "some initial contents"
349         fn.create(CONTENTS)
350         p = mutable.Publish(fn)
351         r = mutable.Retrieve(fn)
352         # make some fake shares
353         shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
354         target_info = None
355         p._privkey = FakePrivKey(0)
356         p._encprivkey = "encprivkey"
357         p._pubkey = FakePubKey(0)
358         d = defer.maybeDeferred(p._generate_shares,
359                                 (shares_and_ids,
360                                  3, 10,
361                                  21, # segsize
362                                  len(CONTENTS),
363                                  target_info),
364                                 3, # seqnum
365                                 "IV"*8)
366         def _done( (seqnum, root_hash, final_shares, target_info2) ):
367             self.failUnlessEqual(seqnum, 3)
368             self.failUnlessEqual(len(root_hash), 32)
369             self.failUnless(isinstance(final_shares, dict))
370             self.failUnlessEqual(len(final_shares), 10)
371             self.failUnlessEqual(sorted(final_shares.keys()), range(10))
372             for i,sh in final_shares.items():
373                 self.failUnless(isinstance(sh, str))
374                 # feed the share through the unpacker as a sanity-check
375                 pieces = unpack_share(sh)
376                 (u_seqnum, u_root_hash, IV, k, N, segsize, datalen,
377                  pubkey, signature, share_hash_chain, block_hash_tree,
378                  share_data, enc_privkey) = pieces
379                 self.failUnlessEqual(u_seqnum, 3)
380                 self.failUnlessEqual(u_root_hash, root_hash)
381                 self.failUnlessEqual(k, 3)
382                 self.failUnlessEqual(N, 10)
383                 self.failUnlessEqual(segsize, 21)
384                 self.failUnlessEqual(datalen, len(CONTENTS))
385                 self.failUnlessEqual(pubkey, p._pubkey.serialize())
386                 sig_material = struct.pack(">BQ32s16s BBQQ",
387                                            0, p._new_seqnum, root_hash, IV,
388                                            k, N, segsize, datalen)
389                 self.failUnless(p._pubkey.verify(sig_material, signature))
390                 #self.failUnlessEqual(signature, p._privkey.sign(sig_material))
391                 self.failUnless(isinstance(share_hash_chain, dict))
392                 self.failUnlessEqual(len(share_hash_chain), 4) # ln2(10)++
393                 for shnum,share_hash in share_hash_chain.items():
394                     self.failUnless(isinstance(shnum, int))
395                     self.failUnless(isinstance(share_hash, str))
396                     self.failUnlessEqual(len(share_hash), 32)
397                 self.failUnless(isinstance(block_hash_tree, list))
398                 self.failUnlessEqual(len(block_hash_tree), 1) # very small tree
399                 self.failUnlessEqual(IV, "SALT"*4)
400                 self.failUnlessEqual(len(share_data), len("%07d" % 1))
401                 self.failUnlessEqual(enc_privkey, fn.get_encprivkey())
402         d.addCallback(_generated)
403         return d
404
405     # TODO: when we publish to 20 peers, we should get one share per peer on 10
406     # when we publish to 3 peers, we should get either 3 or 4 shares per peer
407     # when we publish to zero peers, we should get a NotEnoughSharesError
408
409 class Servermap(unittest.TestCase):
410     def setUp(self):
411         # publish a file and create shares, which can then be manipulated
412         # later.
413         num_peers = 20
414         self._client = FakeClient(num_peers)
415         self._storage = self._client._storage
416         d = self._client.create_mutable_file("New contents go here")
417         def _created(node):
418             self._fn = node
419         d.addCallback(_created)
420         return d
421
422     def make_servermap(self, mode=MODE_CHECK):
423         smu = ServermapUpdater(self._fn, ServerMap(), mode)
424         d = smu.update()
425         return d
426
427     def update_servermap(self, oldmap, mode=MODE_CHECK):
428         smu = ServermapUpdater(self._fn, oldmap, mode)
429         d = smu.update()
430         return d
431
432     def failUnlessOneRecoverable(self, sm, num_shares):
433         self.failUnlessEqual(len(sm.recoverable_versions()), 1)
434         self.failUnlessEqual(len(sm.unrecoverable_versions()), 0)
435         best = sm.best_recoverable_version()
436         self.failIfEqual(best, None)
437         self.failUnlessEqual(sm.recoverable_versions(), set([best]))
438         self.failUnlessEqual(len(sm.shares_available()), 1)
439         self.failUnlessEqual(sm.shares_available()[best], (num_shares, 3))
440         return sm
441
442     def test_basic(self):
443         d = defer.succeed(None)
444         ms = self.make_servermap
445         us = self.update_servermap
446
447         d.addCallback(lambda res: ms(mode=MODE_CHECK))
448         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
449         d.addCallback(lambda res: ms(mode=MODE_WRITE))
450         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
451         d.addCallback(lambda res: ms(mode=MODE_READ))
452         # this more stops at k+epsilon, and epsilon=k, so 6 shares
453         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
454         d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
455         # this mode stops at 'k' shares
456         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
457
458         # and can we re-use the same servermap? Note that these are sorted in
459         # increasing order of number of servers queried, since once a server
460         # gets into the servermap, we'll always ask it for an update.
461         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
462         d.addCallback(lambda sm: us(sm, mode=MODE_READ))
463         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
464         d.addCallback(lambda sm: us(sm, mode=MODE_WRITE))
465         d.addCallback(lambda sm: us(sm, mode=MODE_CHECK))
466         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
467         d.addCallback(lambda sm: us(sm, mode=MODE_ANYTHING))
468         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
469
470         return d
471
472     def test_mark_bad(self):
473         d = defer.succeed(None)
474         ms = self.make_servermap
475         us = self.update_servermap
476
477         d.addCallback(lambda res: ms(mode=MODE_READ))
478         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
479         def _made_map(sm):
480             v = sm.best_recoverable_version()
481             vm = sm.make_versionmap()
482             shares = list(vm[v])
483             self.failUnlessEqual(len(shares), 6)
484             self._corrupted = set()
485             # mark the first 5 shares as corrupt, then update the servermap.
486             # The map should not have the marked shares it in any more, and
487             # new shares should be found to replace the missing ones.
488             for (shnum, peerid, timestamp) in shares:
489                 if shnum < 5:
490                     self._corrupted.add( (peerid, shnum) )
491                     sm.mark_bad_share(peerid, shnum)
492             return self.update_servermap(sm, MODE_WRITE)
493         d.addCallback(_made_map)
494         def _check_map(sm):
495             # this should find all 5 shares that weren't marked bad
496             v = sm.best_recoverable_version()
497             vm = sm.make_versionmap()
498             shares = list(vm[v])
499             for (peerid, shnum) in self._corrupted:
500                 peer_shares = sm.shares_on_peer(peerid)
501                 self.failIf(shnum in peer_shares,
502                             "%d was in %s" % (shnum, peer_shares))
503             self.failUnlessEqual(len(shares), 5)
504         d.addCallback(_check_map)
505         return d
506
507     def failUnlessNoneRecoverable(self, sm):
508         self.failUnlessEqual(len(sm.recoverable_versions()), 0)
509         self.failUnlessEqual(len(sm.unrecoverable_versions()), 0)
510         best = sm.best_recoverable_version()
511         self.failUnlessEqual(best, None)
512         self.failUnlessEqual(len(sm.shares_available()), 0)
513
514     def test_no_shares(self):
515         self._client._storage._peers = {} # delete all shares
516         ms = self.make_servermap
517         d = defer.succeed(None)
518
519         d.addCallback(lambda res: ms(mode=MODE_CHECK))
520         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
521
522         d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
523         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
524
525         d.addCallback(lambda res: ms(mode=MODE_WRITE))
526         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
527
528         d.addCallback(lambda res: ms(mode=MODE_READ))
529         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
530
531         return d
532
533     def failUnlessNotQuiteEnough(self, sm):
534         self.failUnlessEqual(len(sm.recoverable_versions()), 0)
535         self.failUnlessEqual(len(sm.unrecoverable_versions()), 1)
536         best = sm.best_recoverable_version()
537         self.failUnlessEqual(best, None)
538         self.failUnlessEqual(len(sm.shares_available()), 1)
539         self.failUnlessEqual(sm.shares_available().values()[0], (2,3) )
540
541     def test_not_quite_enough_shares(self):
542         s = self._client._storage
543         ms = self.make_servermap
544         num_shares = len(s._peers)
545         for peerid in s._peers:
546             s._peers[peerid] = {}
547             num_shares -= 1
548             if num_shares == 2:
549                 break
550         # now there ought to be only two shares left
551         assert len([peerid for peerid in s._peers if s._peers[peerid]]) == 2
552
553         d = defer.succeed(None)
554
555         d.addCallback(lambda res: ms(mode=MODE_CHECK))
556         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
557         d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
558         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
559         d.addCallback(lambda res: ms(mode=MODE_WRITE))
560         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
561         d.addCallback(lambda res: ms(mode=MODE_READ))
562         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
563
564         return d
565
566
567
568 class Roundtrip(unittest.TestCase):
569     def setUp(self):
570         # publish a file and create shares, which can then be manipulated
571         # later.
572         self.CONTENTS = "New contents go here"
573         num_peers = 20
574         self._client = FakeClient(num_peers)
575         self._storage = self._client._storage
576         d = self._client.create_mutable_file(self.CONTENTS)
577         def _created(node):
578             self._fn = node
579         d.addCallback(_created)
580         return d
581
582     def make_servermap(self, mode=MODE_READ, oldmap=None):
583         if oldmap is None:
584             oldmap = ServerMap()
585         smu = ServermapUpdater(self._fn, oldmap, mode)
586         d = smu.update()
587         return d
588
589     def abbrev_verinfo(self, verinfo):
590         if verinfo is None:
591             return None
592         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
593          offsets_tuple) = verinfo
594         return "%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
595
596     def abbrev_verinfo_dict(self, verinfo_d):
597         output = {}
598         for verinfo,value in verinfo_d.items():
599             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
600              offsets_tuple) = verinfo
601             output["%d-%s" % (seqnum, base32.b2a(root_hash)[:4])] = value
602         return output
603
604     def dump_servermap(self, servermap):
605         print "SERVERMAP", servermap
606         print "RECOVERABLE", [self.abbrev_verinfo(v)
607                               for v in servermap.recoverable_versions()]
608         print "BEST", self.abbrev_verinfo(servermap.best_recoverable_version())
609         print "available", self.abbrev_verinfo_dict(servermap.shares_available())
610
611     def do_download(self, servermap, version=None):
612         if version is None:
613             version = servermap.best_recoverable_version()
614         r = Retrieve(self._fn, servermap, version)
615         return r.download()
616
617     def test_basic(self):
618         d = self.make_servermap()
619         def _do_retrieve(servermap):
620             self._smap = servermap
621             #self.dump_servermap(servermap)
622             self.failUnlessEqual(len(servermap.recoverable_versions()), 1)
623             return self.do_download(servermap)
624         d.addCallback(_do_retrieve)
625         def _retrieved(new_contents):
626             self.failUnlessEqual(new_contents, self.CONTENTS)
627         d.addCallback(_retrieved)
628         # we should be able to re-use the same servermap, both with and
629         # without updating it.
630         d.addCallback(lambda res: self.do_download(self._smap))
631         d.addCallback(_retrieved)
632         d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
633         d.addCallback(lambda res: self.do_download(self._smap))
634         d.addCallback(_retrieved)
635         # clobbering the pubkey should make the servermap updater re-fetch it
636         def _clobber_pubkey(res):
637             self._fn._pubkey = None
638         d.addCallback(_clobber_pubkey)
639         d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
640         d.addCallback(lambda res: self.do_download(self._smap))
641         d.addCallback(_retrieved)
642         return d
643
644
645     def shouldFail(self, expected_failure, which, substring,
646                     callable, *args, **kwargs):
647         assert substring is None or isinstance(substring, str)
648         d = defer.maybeDeferred(callable, *args, **kwargs)
649         def done(res):
650             if isinstance(res, failure.Failure):
651                 res.trap(expected_failure)
652                 if substring:
653                     self.failUnless(substring in str(res),
654                                     "substring '%s' not in '%s'"
655                                     % (substring, str(res)))
656             else:
657                 self.fail("%s was supposed to raise %s, not get '%s'" %
658                           (which, expected_failure, res))
659         d.addBoth(done)
660         return d
661
662     def _test_corrupt_all(self, offset, substring,
663                           should_succeed=False, corrupt_early=True):
664         d = defer.succeed(None)
665         if corrupt_early:
666             d.addCallback(corrupt, self._storage, offset)
667         d.addCallback(lambda res: self.make_servermap())
668         if not corrupt_early:
669             d.addCallback(corrupt, self._storage, offset)
670         def _do_retrieve(servermap):
671             ver = servermap.best_recoverable_version()
672             if ver is None and not should_succeed:
673                 # no recoverable versions == not succeeding. The problem
674                 # should be noted in the servermap's list of problems.
675                 if substring:
676                     allproblems = [str(f) for f in servermap.problems]
677                     self.failUnless(substring in "".join(allproblems))
678                 return
679             if should_succeed:
680                 d1 = self._fn.download_to_data()
681                 d1.addCallback(lambda new_contents:
682                                self.failUnlessEqual(new_contents, self.CONTENTS))
683                 return d1
684             else:
685                 return self.shouldFail(NotEnoughSharesError,
686                                        "_corrupt_all(offset=%s)" % (offset,),
687                                        substring,
688                                        self._fn.download_to_data)
689         d.addCallback(_do_retrieve)
690         return d
691
692     def test_corrupt_all_verbyte(self):
693         # when the version byte is not 0, we hit an assertion error in
694         # unpack_share().
695         return self._test_corrupt_all(0, "AssertionError")
696
697     def test_corrupt_all_seqnum(self):
698         # a corrupt sequence number will trigger a bad signature
699         return self._test_corrupt_all(1, "signature is invalid")
700
701     def test_corrupt_all_R(self):
702         # a corrupt root hash will trigger a bad signature
703         return self._test_corrupt_all(9, "signature is invalid")
704
705     def test_corrupt_all_IV(self):
706         # a corrupt salt/IV will trigger a bad signature
707         return self._test_corrupt_all(41, "signature is invalid")
708
709     def test_corrupt_all_k(self):
710         # a corrupt 'k' will trigger a bad signature
711         return self._test_corrupt_all(57, "signature is invalid")
712
713     def test_corrupt_all_N(self):
714         # a corrupt 'N' will trigger a bad signature
715         return self._test_corrupt_all(58, "signature is invalid")
716
717     def test_corrupt_all_segsize(self):
718         # a corrupt segsize will trigger a bad signature
719         return self._test_corrupt_all(59, "signature is invalid")
720
721     def test_corrupt_all_datalen(self):
722         # a corrupt data length will trigger a bad signature
723         return self._test_corrupt_all(67, "signature is invalid")
724
725     def test_corrupt_all_pubkey(self):
726         # a corrupt pubkey won't match the URI's fingerprint. We need to
727         # remove the pubkey from the filenode, or else it won't bother trying
728         # to update it.
729         self._fn._pubkey = None
730         return self._test_corrupt_all("pubkey",
731                                       "pubkey doesn't match fingerprint")
732
733     def test_corrupt_all_sig(self):
734         # a corrupt signature is a bad one
735         # the signature runs from about [543:799], depending upon the length
736         # of the pubkey
737         return self._test_corrupt_all("signature", "signature is invalid")
738
739     def test_corrupt_all_share_hash_chain_number(self):
740         # a corrupt share hash chain entry will show up as a bad hash. If we
741         # mangle the first byte, that will look like a bad hash number,
742         # causing an IndexError
743         return self._test_corrupt_all("share_hash_chain", "corrupt hashes")
744
745     def test_corrupt_all_share_hash_chain_hash(self):
746         # a corrupt share hash chain entry will show up as a bad hash. If we
747         # mangle a few bytes in, that will look like a bad hash.
748         return self._test_corrupt_all(("share_hash_chain",4), "corrupt hashes")
749
750     def test_corrupt_all_block_hash_tree(self):
751         return self._test_corrupt_all("block_hash_tree",
752                                       "block hash tree failure")
753
754     def test_corrupt_all_block(self):
755         return self._test_corrupt_all("share_data", "block hash tree failure")
756
757     def test_corrupt_all_encprivkey(self):
758         # a corrupted privkey won't even be noticed by the reader, only by a
759         # writer.
760         return self._test_corrupt_all("enc_privkey", None, should_succeed=True)
761
762     def test_basic_pubkey_at_end(self):
763         # we corrupt the pubkey in all but the last 'k' shares, allowing the
764         # download to succeed but forcing a bunch of retries first. Note that
765         # this is rather pessimistic: our Retrieve process will throw away
766         # the whole share if the pubkey is bad, even though the rest of the
767         # share might be good.
768
769         self._fn._pubkey = None
770         k = self._fn.get_required_shares()
771         N = self._fn.get_total_shares()
772         d = defer.succeed(None)
773         d.addCallback(corrupt, self._storage, "pubkey",
774                       shnums_to_corrupt=range(0, N-k))
775         d.addCallback(lambda res: self.make_servermap())
776         def _do_retrieve(servermap):
777             self.failUnless(servermap.problems)
778             self.failUnless("pubkey doesn't match fingerprint"
779                             in str(servermap.problems[0]))
780             ver = servermap.best_recoverable_version()
781             r = Retrieve(self._fn, servermap, ver)
782             return r.download()
783         d.addCallback(_do_retrieve)
784         d.addCallback(lambda new_contents:
785                       self.failUnlessEqual(new_contents, self.CONTENTS))
786         return d
787
788     def test_corrupt_some(self):
789         # corrupt the data of first five shares (so the servermap thinks
790         # they're good but retrieve marks them as bad), so that the
791         # MODE_READ set of 6 will be insufficient, forcing node.download to
792         # retry with more servers.
793         corrupt(None, self._storage, "share_data", range(5))
794         d = self.make_servermap()
795         def _do_retrieve(servermap):
796             ver = servermap.best_recoverable_version()
797             self.failUnless(ver)
798             return self._fn.download_to_data()
799         d.addCallback(_do_retrieve)
800         d.addCallback(lambda new_contents:
801                       self.failUnlessEqual(new_contents, self.CONTENTS))
802         return d
803
804     def test_download_fails(self):
805         corrupt(None, self._storage, "signature")
806         d = self.shouldFail(UnrecoverableFileError, "test_download_anyway",
807                             "no recoverable versions",
808                             self._fn.download_to_data)
809         return d
810
811
812 class MultipleEncodings(unittest.TestCase):
813     def setUp(self):
814         self.CONTENTS = "New contents go here"
815         num_peers = 20
816         self._client = FakeClient(num_peers)
817         self._storage = self._client._storage
818         d = self._client.create_mutable_file(self.CONTENTS)
819         def _created(node):
820             self._fn = node
821         d.addCallback(_created)
822         return d
823
824     def _encode(self, k, n, data):
825         # encode 'data' into a peerid->shares dict.
826
827         fn2 = FastMutableFileNode(self._client)
828         # init_from_uri populates _uri, _writekey, _readkey, _storage_index,
829         # and _fingerprint
830         fn = self._fn
831         fn2.init_from_uri(fn.get_uri())
832         # then we copy over other fields that are normally fetched from the
833         # existing shares
834         fn2._pubkey = fn._pubkey
835         fn2._privkey = fn._privkey
836         fn2._encprivkey = fn._encprivkey
837         fn2._current_seqnum = 0
838         fn2._current_roothash = "\x00" * 32
839         # and set the encoding parameters to something completely different
840         fn2._required_shares = k
841         fn2._total_shares = n
842
843         s = self._client._storage
844         s._peers = {} # clear existing storage
845         p2 = Publish(fn2, None)
846         d = p2.publish(data)
847         def _published(res):
848             shares = s._peers
849             s._peers = {}
850             return shares
851         d.addCallback(_published)
852         return d
853
854     def make_servermap(self, mode=MODE_READ, oldmap=None):
855         if oldmap is None:
856             oldmap = ServerMap()
857         smu = ServermapUpdater(self._fn, oldmap, mode)
858         d = smu.update()
859         return d
860
861     def test_multiple_encodings(self):
862         # we encode the same file in two different ways (3-of-10 and 4-of-9),
863         # then mix up the shares, to make sure that download survives seeing
864         # a variety of encodings. This is actually kind of tricky to set up.
865
866         contents1 = "Contents for encoding 1 (3-of-10) go here"
867         contents2 = "Contents for encoding 2 (4-of-9) go here"
868         contents3 = "Contents for encoding 3 (4-of-7) go here"
869
870         # we make a retrieval object that doesn't know what encoding
871         # parameters to use
872         fn3 = FastMutableFileNode(self._client)
873         fn3.init_from_uri(self._fn.get_uri())
874
875         # now we upload a file through fn1, and grab its shares
876         d = self._encode(3, 10, contents1)
877         def _encoded_1(shares):
878             self._shares1 = shares
879         d.addCallback(_encoded_1)
880         d.addCallback(lambda res: self._encode(4, 9, contents2))
881         def _encoded_2(shares):
882             self._shares2 = shares
883         d.addCallback(_encoded_2)
884         d.addCallback(lambda res: self._encode(4, 7, contents3))
885         def _encoded_3(shares):
886             self._shares3 = shares
887         d.addCallback(_encoded_3)
888
889         def _merge(res):
890             log.msg("merging sharelists")
891             # we merge the shares from the two sets, leaving each shnum in
892             # its original location, but using a share from set1 or set2
893             # according to the following sequence:
894             #
895             #  4-of-9  a  s2
896             #  4-of-9  b  s2
897             #  4-of-7  c   s3
898             #  4-of-9  d  s2
899             #  3-of-9  e s1
900             #  3-of-9  f s1
901             #  3-of-9  g s1
902             #  4-of-9  h  s2
903             #
904             # so that neither form can be recovered until fetch [f], at which
905             # point version-s1 (the 3-of-10 form) should be recoverable. If
906             # the implementation latches on to the first version it sees,
907             # then s2 will be recoverable at fetch [g].
908
909             # Later, when we implement code that handles multiple versions,
910             # we can use this framework to assert that all recoverable
911             # versions are retrieved, and test that 'epsilon' does its job
912
913             places = [2, 2, 3, 2, 1, 1, 1, 2]
914
915             sharemap = {}
916
917             for i,peerid in enumerate(self._client._peerids):
918                 peerid_s = shortnodeid_b2a(peerid)
919                 for shnum in self._shares1.get(peerid, {}):
920                     if shnum < len(places):
921                         which = places[shnum]
922                     else:
923                         which = "x"
924                     self._client._storage._peers[peerid] = peers = {}
925                     in_1 = shnum in self._shares1[peerid]
926                     in_2 = shnum in self._shares2.get(peerid, {})
927                     in_3 = shnum in self._shares3.get(peerid, {})
928                     #print peerid_s, shnum, which, in_1, in_2, in_3
929                     if which == 1:
930                         if in_1:
931                             peers[shnum] = self._shares1[peerid][shnum]
932                             sharemap[shnum] = peerid
933                     elif which == 2:
934                         if in_2:
935                             peers[shnum] = self._shares2[peerid][shnum]
936                             sharemap[shnum] = peerid
937                     elif which == 3:
938                         if in_3:
939                             peers[shnum] = self._shares3[peerid][shnum]
940                             sharemap[shnum] = peerid
941
942             # we don't bother placing any other shares
943             # now sort the sequence so that share 0 is returned first
944             new_sequence = [sharemap[shnum]
945                             for shnum in sorted(sharemap.keys())]
946             self._client._storage._sequence = new_sequence
947             log.msg("merge done")
948         d.addCallback(_merge)
949         d.addCallback(lambda res: fn3.download_to_data())
950         def _retrieved(new_contents):
951             # the current specified behavior is "first version recoverable"
952             self.failUnlessEqual(new_contents, contents1)
953         d.addCallback(_retrieved)
954         return d
955
956
957 class Utils(unittest.TestCase):
958     def test_dict_of_sets(self):
959         ds = DictOfSets()
960         ds.add(1, "a")
961         ds.add(2, "b")
962         ds.add(2, "b")
963         ds.add(2, "c")
964         self.failUnlessEqual(ds[1], set(["a"]))
965         self.failUnlessEqual(ds[2], set(["b", "c"]))
966         ds.discard(3, "d") # should not raise an exception
967         ds.discard(2, "b")
968         self.failUnlessEqual(ds[2], set(["c"]))
969         ds.discard(2, "c")
970         self.failIf(2 in ds)
971
972     def _do_inside(self, c, x_start, x_length, y_start, y_length):
973         # we compare this against sets of integers
974         x = set(range(x_start, x_start+x_length))
975         y = set(range(y_start, y_start+y_length))
976         should_be_inside = x.issubset(y)
977         self.failUnlessEqual(should_be_inside, c._inside(x_start, x_length,
978                                                          y_start, y_length),
979                              str((x_start, x_length, y_start, y_length)))
980
981     def test_cache_inside(self):
982         c = ResponseCache()
983         x_start = 10
984         x_length = 5
985         for y_start in range(8, 17):
986             for y_length in range(8):
987                 self._do_inside(c, x_start, x_length, y_start, y_length)
988
989     def _do_overlap(self, c, x_start, x_length, y_start, y_length):
990         # we compare this against sets of integers
991         x = set(range(x_start, x_start+x_length))
992         y = set(range(y_start, y_start+y_length))
993         overlap = bool(x.intersection(y))
994         self.failUnlessEqual(overlap, c._does_overlap(x_start, x_length,
995                                                       y_start, y_length),
996                              str((x_start, x_length, y_start, y_length)))
997
998     def test_cache_overlap(self):
999         c = ResponseCache()
1000         x_start = 10
1001         x_length = 5
1002         for y_start in range(8, 17):
1003             for y_length in range(8):
1004                 self._do_overlap(c, x_start, x_length, y_start, y_length)
1005
1006     def test_cache(self):
1007         c = ResponseCache()
1008         # xdata = base62.b2a(os.urandom(100))[:100]
1009         xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
1010         ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
1011         nope = (None, None)
1012         c.add("v1", 1, 0, xdata, "time0")
1013         c.add("v1", 1, 2000, ydata, "time1")
1014         self.failUnlessEqual(c.read("v2", 1, 10, 11), nope)
1015         self.failUnlessEqual(c.read("v1", 2, 10, 11), nope)
1016         self.failUnlessEqual(c.read("v1", 1, 0, 10), (xdata[:10], "time0"))
1017         self.failUnlessEqual(c.read("v1", 1, 90, 10), (xdata[90:], "time0"))
1018         self.failUnlessEqual(c.read("v1", 1, 300, 10), nope)
1019         self.failUnlessEqual(c.read("v1", 1, 2050, 5), (ydata[50:55], "time1"))
1020         self.failUnlessEqual(c.read("v1", 1, 0, 101), nope)
1021         self.failUnlessEqual(c.read("v1", 1, 99, 1), (xdata[99:100], "time0"))
1022         self.failUnlessEqual(c.read("v1", 1, 100, 1), nope)
1023         self.failUnlessEqual(c.read("v1", 1, 1990, 9), nope)
1024         self.failUnlessEqual(c.read("v1", 1, 1990, 10), nope)
1025         self.failUnlessEqual(c.read("v1", 1, 1990, 11), nope)
1026         self.failUnlessEqual(c.read("v1", 1, 1990, 15), nope)
1027         self.failUnlessEqual(c.read("v1", 1, 1990, 19), nope)
1028         self.failUnlessEqual(c.read("v1", 1, 1990, 20), nope)
1029         self.failUnlessEqual(c.read("v1", 1, 1990, 21), nope)
1030         self.failUnlessEqual(c.read("v1", 1, 1990, 25), nope)
1031         self.failUnlessEqual(c.read("v1", 1, 1999, 25), nope)
1032
1033         # optional: join fragments
1034         c = ResponseCache()
1035         c.add("v1", 1, 0, xdata[:10], "time0")
1036         c.add("v1", 1, 10, xdata[10:20], "time1")
1037         #self.failUnlessEqual(c.read("v1", 1, 0, 20), (xdata[:20], "time0"))
1038