import itertools, struct, re
from cStringIO import StringIO
from twisted.trial import unittest
-from twisted.internet import defer
-from twisted.python import failure, log
+from twisted.internet import defer, reactor
+from twisted.python import failure
from allmydata import mutable, uri, dirnode, download
from allmydata.util.hashutil import tagged_hash
from allmydata.encode import NotEnoughPeersError
from allmydata.interfaces import IURI, INewDirectoryURI, \
IMutableFileURI, IUploadable, IFileURI
from allmydata.filenode import LiteralFileNode
+from foolscap.eventual import eventually
+from foolscap.logging import log
import sha
#from allmydata.test.common import FakeMutableFileNode
def __init__(self):
self._peers = {}
+ # _sequence is used to cause the responses to occur in a specific
+ # order. If it is in use, then we will defer queries instead of
+ # answering them right away, accumulating the Deferreds in a dict. We
+ # don't know exactly how many queries we'll get, so exactly one
+ # second after the first query arrives, we will release them all (in
+ # order).
+ self._sequence = None
+ self._pending = {}
def read(self, peerid, storage_index):
shares = self._peers.get(peerid, {})
- return shares
+ if self._sequence is None:
+ return shares
+ d = defer.Deferred()
+ if not self._pending:
+ reactor.callLater(1.0, self._fire_readers)
+ self._pending[peerid] = (d, shares)
+ return d
+
+ def _fire_readers(self):
+ pending = self._pending
+ self._pending = {}
+ extra = []
+ for peerid in self._sequence:
+ if peerid in pending:
+ d, shares = pending.pop(peerid)
+ eventually(d.callback, shares)
+ for (d, shares) in pending.items():
+ eventually(d.callback, shares)
def write(self, peerid, storage_index, shnum, offset, data):
if peerid not in self._peers:
def _do_read(self, ss, peerid, storage_index, shnums, readv):
assert ss[0] == peerid
assert shnums == []
- return defer.succeed(self._storage.read(peerid, storage_index))
+ return defer.maybeDeferred(self._storage.read, peerid, storage_index)
def _do_testreadwrite(self, peerid, secrets,
tw_vectors, read_vector):
class FakeRetrieve(mutable.Retrieve):
def _do_read(self, ss, peerid, storage_index, shnums, readv):
- shares = self._storage.read(peerid, storage_index)
-
- response = {}
- for shnum in shares:
- if shnums and shnum not in shnums:
- continue
- vector = response[shnum] = []
- for (offset, length) in readv:
- vector.append(shares[shnum][offset:offset+length])
- return defer.succeed(response)
+ d = defer.maybeDeferred(self._storage.read, peerid, storage_index)
+ def _read(shares):
+ response = {}
+ for shnum in shares:
+ if shnums and shnum not in shnums:
+ continue
+ vector = response[shnum] = []
+ for (offset, length) in readv:
+ vector.append(shares[shnum][offset:offset+length])
+ return response
+ d.addCallback(_read)
+ return d
def _deserialize_pubkey(self, pubkey_s):
mo = re.search(r"^PUBKEY-(\d+)$", pubkey_s)
self.failUnlessEqual(contents, new_contents)
d.addCallback(_retrieved)
return d
+
+ def test_basic_sequenced(self):
+ c, s, fn, p, r = self.setup_for_publish(20)
+ s._sequence = c._peerids[:]
+ contents = "New contents go here"
+ d = p.publish(contents)
+ def _published(res):
+ return r.retrieve()
+ d.addCallback(_published)
+ def _retrieved(new_contents):
+ self.failUnlessEqual(contents, new_contents)
+ d.addCallback(_retrieved)
+ return d
+
+ def test_basic_pubkey_at_end(self):
+ # we corrupt the pubkey in all but the last 'k' shares, allowing the
+ # download to succeed but forcing a bunch of retries first. Note that
+ # this is rather pessimistic: our Retrieve process will throw away
+ # the whole share if the pubkey is bad, even though the rest of the
+ # share might be good.
+ c, s, fn, p, r = self.setup_for_publish(20)
+ s._sequence = c._peerids[:]
+ contents = "New contents go here"
+ d = p.publish(contents)
+ def _published(res):
+ r._pubkey = None
+ homes = [peerid for peerid in c._peerids
+ if s._peers.get(peerid, {})]
+ k = fn.get_required_shares()
+ homes_to_corrupt = homes[:-k]
+ for peerid in homes_to_corrupt:
+ shares = s._peers[peerid]
+ for shnum in shares:
+ data = shares[shnum]
+ (version,
+ seqnum,
+ root_hash,
+ IV,
+ k, N, segsize, datalen,
+ o) = mutable.unpack_header(data)
+ offset = 107 # pubkey
+ shares[shnum] = self.flip_bit(data, offset)
+ return r.retrieve()
+ d.addCallback(_published)
+ def _retrieved(new_contents):
+ self.failUnlessEqual(contents, new_contents)
+ d.addCallback(_retrieved)
+ return d
+
+ def OFF_test_multiple_encodings(self): # not finished yet
+ # we encode the same file in two different ways (3-of-10 and 4-of-9),
+ # then mix up the shares, to make sure that download survives seeing
+ # a variety of encodings. This is actually kind of tricky to set up.
+ c, s, fn, p, r = self.setup_for_publish(20)
+ # we ignore fn, p, and r
+
+ # force share retrieval to occur in this order
+ s._sequence = c._peerids[:]
+
+ fn1 = FakeFilenode(c)
+ fn1.DEFAULT_ENCODING = (3,10)
+ fn1.create("")
+ p1 = FakePublish(fn1)
+ p1._storage = s
+
+ # and we make a second filenode with the same key but different
+ # encoding
+ fn2 = FakeFilenode(c)
+ # init_from_uri populates _uri, _writekey, _readkey, _storage_index,
+ # and _fingerprint
+ fn2.init_from_uri(fn1.get_uri())
+ # then we copy over other fields that are normally fetched from the
+ # existing shares
+ fn2._pubkey = fn1._pubkey
+ fn2._privkey = fn1._privkey
+ fn2._encprivkey = fn1._encprivkey
+ fn2._current_seqnum = 0
+ fn2._current_roothash = "\x00" * 32
+ # and set the encoding parameters to something completely different
+ fn2._required_shares = 4
+ fn2._total_shares = 9
+
+ p2 = FakePublish(fn2)
+ p2._storage = s
+
+ # we make a retrieval object that doesn't know what encoding
+ # parameters to use
+ fn3 = FakeFilenode(c)
+ fn3.init_from_uri(fn1.get_uri())
+ r3 = FakeRetrieve(fn3)
+ r3._storage = s
+
+ # now we upload a file through fn1, and grab its shares
+ contents1 = "Contents for encoding 1 (3-of-10) go here"
+ contents2 = "Contents for encoding 2 (4-of-9) go here"
+ d = p1.publish(contents1)
+ def _published_1(res):
+ self._shares1 = s._peers
+ s._peers = {}
+ # and upload it through fn2
+ return p2.publish(contents2)
+ d.addCallback(_published_1)
+ def _published_2(res):
+ self._shares2 = s._peers
+ s._peers = {}
+ d.addCallback(_published_2)
+ def _merge(res):
+ log.msg("merging sharelists")
+ print len(self._shares1), len(self._shares2)
+ from allmydata.util import idlib
+ # we rearrange the shares, removing them from their original
+ # homes.
+ shares1 = self._shares1.values()
+ shares2 = self._shares2.values()
+
+ print len(shares1), len(shares2)
+ # then we place shares in the following order:
+ # 4-of-9 a s2
+ # 4-of-9 b s2
+ # 4-of-9 c s2
+ # 3-of-9 d s1
+ # 3-of-9 e s1
+ # 4-of-9 f s2
+ # 3-of-9 g s1
+ # so that neither form can be recovered until fetch [f]. Later,
+ # when we implement code that handles multiple versions, we can
+ # use this framework to assert that all recoverable versions are
+ # retrieved, and test that 'epsilon' does its job
+ places = [2, 2, 2, 1, 1, 2, 1]
+ for i in range(len(s._sequence)):
+ peerid = s._sequence[i]
+ if not places:
+ print idlib.shortnodeid_b2a(peerid), "-", "-"
+ break
+ which = places.pop(0)
+ if which == 1:
+ print idlib.shortnodeid_b2a(peerid), "1", "-"
+ s._peers[peerid] = shares1.pop(0)
+ else:
+ print idlib.shortnodeid_b2a(peerid), "-", "2"
+ s._peers[peerid] = shares2.pop(0)
+ # we don't bother placing any other shares
+ log.msg("merge done")
+ d.addCallback(_merge)
+ d.addCallback(lambda res: r3.retrieve())
+ def _retrieved(new_contents):
+ # the current specified behavior is "first version recoverable"
+ self.failUnlessEqual(new_contents, contents2)
+ d.addCallback(_retrieved)
+ return d
+