test_mutable: more test coverage, building up a framework to cause reads to occur...
authorBrian Warner <warner@allmydata.com>
Tue, 11 Mar 2008 05:15:43 +0000 (22:15 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 11 Mar 2008 05:15:43 +0000 (22:15 -0700)
src/allmydata/mutable.py
src/allmydata/test/test_mutable.py

index 8647b8c00b3ef2b57fa3ac647b2f626a557824b1..e24372b88ea173f017af505d5a79b1d1c9acf566 100644 (file)
@@ -1641,6 +1641,7 @@ class MutableFileNode:
     publish_class = Publish
     retrieve_class = Retrieve
     SIGNATURE_KEY_SIZE = 2048
+    DEFAULT_ENCODING = (3, 10)
 
     def __init__(self, client):
         self._client = client
@@ -1685,8 +1686,8 @@ class MutableFileNode:
         contents. Returns a Deferred that fires (with the MutableFileNode
         instance you should use) when it completes.
         """
-        self._required_shares = 3
-        self._total_shares = 10
+        self._required_shares, self._total_shares = self.DEFAULT_ENCODING
+
         d = defer.maybeDeferred(self._generate_pubprivkeys)
         def _generated( (pubkey, privkey) ):
             self._pubkey, self._privkey = pubkey, privkey
index 5f0a57869439c77f993c42338042b4697e17cf09..f0a0427395ef38d757b495f39622478fc576f5a7 100644 (file)
@@ -2,14 +2,16 @@
 import itertools, struct, re
 from cStringIO import StringIO
 from twisted.trial import unittest
-from twisted.internet import defer
-from twisted.python import failure, log
+from twisted.internet import defer, reactor
+from twisted.python import failure
 from allmydata import mutable, uri, dirnode, download
 from allmydata.util.hashutil import tagged_hash
 from allmydata.encode import NotEnoughPeersError
 from allmydata.interfaces import IURI, INewDirectoryURI, \
      IMutableFileURI, IUploadable, IFileURI
 from allmydata.filenode import LiteralFileNode
+from foolscap.eventual import eventually
+from foolscap.logging import log
 import sha
 
 #from allmydata.test.common import FakeMutableFileNode
@@ -59,10 +61,35 @@ class FakeStorage:
 
     def __init__(self):
         self._peers = {}
+        # _sequence is used to cause the responses to occur in a specific
+        # order. If it is in use, then we will defer queries instead of
+        # answering them right away, accumulating the Deferreds in a dict. We
+        # don't know exactly how many queries we'll get, so exactly one
+        # second after the first query arrives, we will release them all (in
+        # order).
+        self._sequence = None
+        self._pending = {}
 
     def read(self, peerid, storage_index):
         shares = self._peers.get(peerid, {})
-        return shares
+        if self._sequence is None:
+            return shares
+        d = defer.Deferred()
+        if not self._pending:
+            reactor.callLater(1.0, self._fire_readers)
+        self._pending[peerid] = (d, shares)
+        return d
+
+    def _fire_readers(self):
+        pending = self._pending
+        self._pending = {}
+        extra = []
+        for peerid in self._sequence:
+            if peerid in pending:
+                d, shares = pending.pop(peerid)
+                eventually(d.callback, shares)
+        for (d, shares) in pending.items():
+            eventually(d.callback, shares)
 
     def write(self, peerid, storage_index, shnum, offset, data):
         if peerid not in self._peers:
@@ -80,7 +107,7 @@ class FakePublish(mutable.Publish):
     def _do_read(self, ss, peerid, storage_index, shnums, readv):
         assert ss[0] == peerid
         assert shnums == []
-        return defer.succeed(self._storage.read(peerid, storage_index))
+        return defer.maybeDeferred(self._storage.read, peerid, storage_index)
 
     def _do_testreadwrite(self, peerid, secrets,
                           tw_vectors, read_vector):
@@ -432,16 +459,18 @@ class Publish(unittest.TestCase):
 
 class FakeRetrieve(mutable.Retrieve):
     def _do_read(self, ss, peerid, storage_index, shnums, readv):
-        shares = self._storage.read(peerid, storage_index)
-
-        response = {}
-        for shnum in shares:
-            if shnums and shnum not in shnums:
-                continue
-            vector = response[shnum] = []
-            for (offset, length) in readv:
-                vector.append(shares[shnum][offset:offset+length])
-        return defer.succeed(response)
+        d = defer.maybeDeferred(self._storage.read, peerid, storage_index)
+        def _read(shares):
+            response = {}
+            for shnum in shares:
+                if shnums and shnum not in shnums:
+                    continue
+                vector = response[shnum] = []
+                for (offset, length) in readv:
+                    vector.append(shares[shnum][offset:offset+length])
+            return response
+        d.addCallback(_read)
+        return d
 
     def _deserialize_pubkey(self, pubkey_s):
         mo = re.search(r"^PUBKEY-(\d+)$", pubkey_s)
@@ -626,3 +655,154 @@ class Roundtrip(unittest.TestCase):
             self.failUnlessEqual(contents, new_contents)
         d.addCallback(_retrieved)
         return d
+
+    def test_basic_sequenced(self):
+        c, s, fn, p, r = self.setup_for_publish(20)
+        s._sequence = c._peerids[:]
+        contents = "New contents go here"
+        d = p.publish(contents)
+        def _published(res):
+            return r.retrieve()
+        d.addCallback(_published)
+        def _retrieved(new_contents):
+            self.failUnlessEqual(contents, new_contents)
+        d.addCallback(_retrieved)
+        return d
+
+    def test_basic_pubkey_at_end(self):
+        # we corrupt the pubkey in all but the last 'k' shares, allowing the
+        # download to succeed but forcing a bunch of retries first. Note that
+        # this is rather pessimistic: our Retrieve process will throw away
+        # the whole share if the pubkey is bad, even though the rest of the
+        # share might be good.
+        c, s, fn, p, r = self.setup_for_publish(20)
+        s._sequence = c._peerids[:]
+        contents = "New contents go here"
+        d = p.publish(contents)
+        def _published(res):
+            r._pubkey = None
+            homes = [peerid for peerid in c._peerids
+                     if s._peers.get(peerid, {})]
+            k = fn.get_required_shares()
+            homes_to_corrupt = homes[:-k]
+            for peerid in homes_to_corrupt:
+                shares = s._peers[peerid]
+                for shnum in shares:
+                    data = shares[shnum]
+                    (version,
+                     seqnum,
+                     root_hash,
+                     IV,
+                     k, N, segsize, datalen,
+                     o) = mutable.unpack_header(data)
+                    offset = 107 # pubkey
+                    shares[shnum] = self.flip_bit(data, offset)
+            return r.retrieve()
+        d.addCallback(_published)
+        def _retrieved(new_contents):
+            self.failUnlessEqual(contents, new_contents)
+        d.addCallback(_retrieved)
+        return d
+
+    def OFF_test_multiple_encodings(self): # not finished yet
+        # we encode the same file in two different ways (3-of-10 and 4-of-9),
+        # then mix up the shares, to make sure that download survives seeing
+        # a variety of encodings. This is actually kind of tricky to set up.
+        c, s, fn, p, r = self.setup_for_publish(20)
+        # we ignore fn, p, and r
+
+        # force share retrieval to occur in this order
+        s._sequence = c._peerids[:]
+
+        fn1 = FakeFilenode(c)
+        fn1.DEFAULT_ENCODING = (3,10)
+        fn1.create("")
+        p1 = FakePublish(fn1)
+        p1._storage = s
+
+        # and we make a second filenode with the same key but different
+        # encoding
+        fn2 = FakeFilenode(c)
+        # init_from_uri populates _uri, _writekey, _readkey, _storage_index,
+        # and _fingerprint
+        fn2.init_from_uri(fn1.get_uri())
+        # then we copy over other fields that are normally fetched from the
+        # existing shares
+        fn2._pubkey = fn1._pubkey
+        fn2._privkey = fn1._privkey
+        fn2._encprivkey = fn1._encprivkey
+        fn2._current_seqnum = 0
+        fn2._current_roothash = "\x00" * 32
+        # and set the encoding parameters to something completely different
+        fn2._required_shares = 4
+        fn2._total_shares = 9
+
+        p2 = FakePublish(fn2)
+        p2._storage = s
+
+        # we make a retrieval object that doesn't know what encoding
+        # parameters to use
+        fn3 = FakeFilenode(c)
+        fn3.init_from_uri(fn1.get_uri())
+        r3 = FakeRetrieve(fn3)
+        r3._storage = s
+
+        # now we upload a file through fn1, and grab its shares
+        contents1 = "Contents for encoding 1 (3-of-10) go here"
+        contents2 = "Contents for encoding 2 (4-of-9) go here"
+        d = p1.publish(contents1)
+        def _published_1(res):
+            self._shares1 = s._peers
+            s._peers = {}
+            # and upload it through fn2
+            return p2.publish(contents2)
+        d.addCallback(_published_1)
+        def _published_2(res):
+            self._shares2 = s._peers
+            s._peers = {}
+        d.addCallback(_published_2)
+        def _merge(res):
+            log.msg("merging sharelists")
+            print len(self._shares1), len(self._shares2)
+            from allmydata.util import idlib
+            # we rearrange the shares, removing them from their original
+            # homes.
+            shares1 = self._shares1.values()
+            shares2 = self._shares2.values()
+
+            print len(shares1), len(shares2)
+            # then we place shares in the following order:
+            #  4-of-9  a  s2
+            #  4-of-9  b  s2
+            #  4-of-9  c  s2
+            #  3-of-9  d s1
+            #  3-of-9  e s1
+            #  4-of-9  f  s2
+            #  3-of-9  g s1
+            # so that neither form can be recovered until fetch [f]. Later,
+            # when we implement code that handles multiple versions, we can
+            # use this framework to assert that all recoverable versions are
+            # retrieved, and test that 'epsilon' does its job
+            places = [2, 2, 2, 1, 1, 2, 1]
+            for i in range(len(s._sequence)):
+                peerid = s._sequence[i]
+                if not places:
+                    print idlib.shortnodeid_b2a(peerid), "-", "-"
+                    break
+                which = places.pop(0)
+                if which == 1:
+                    print idlib.shortnodeid_b2a(peerid), "1", "-"
+                    s._peers[peerid] = shares1.pop(0)
+                else:
+                    print idlib.shortnodeid_b2a(peerid), "-", "2"
+                    s._peers[peerid] = shares2.pop(0)
+            # we don't bother placing any other shares
+            log.msg("merge done")
+        d.addCallback(_merge)
+        d.addCallback(lambda res: r3.retrieve())
+        def _retrieved(new_contents):
+            # the current specified behavior is "first version recoverable"
+            self.failUnlessEqual(new_contents, contents2)
+        d.addCallback(_retrieved)
+        return d
+