From 57bd23f35f859e4f6c8892561f586ae554a04d45 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 10 Mar 2008 22:15:43 -0700 Subject: [PATCH] test_mutable: more test coverage, building up a framework to cause reads to occur in a specific order --- src/allmydata/mutable.py | 5 +- src/allmydata/test/test_mutable.py | 208 +++++++++++++++++++++++++++-- 2 files changed, 197 insertions(+), 16 deletions(-) diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index 8647b8c0..e24372b8 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -1641,6 +1641,7 @@ class MutableFileNode: publish_class = Publish retrieve_class = Retrieve SIGNATURE_KEY_SIZE = 2048 + DEFAULT_ENCODING = (3, 10) def __init__(self, client): self._client = client @@ -1685,8 +1686,8 @@ class MutableFileNode: contents. Returns a Deferred that fires (with the MutableFileNode instance you should use) when it completes. """ - self._required_shares = 3 - self._total_shares = 10 + self._required_shares, self._total_shares = self.DEFAULT_ENCODING + d = defer.maybeDeferred(self._generate_pubprivkeys) def _generated( (pubkey, privkey) ): self._pubkey, self._privkey = pubkey, privkey diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 5f0a5786..f0a04273 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -2,14 +2,16 @@ import itertools, struct, re from cStringIO import StringIO from twisted.trial import unittest -from twisted.internet import defer -from twisted.python import failure, log +from twisted.internet import defer, reactor +from twisted.python import failure from allmydata import mutable, uri, dirnode, download from allmydata.util.hashutil import tagged_hash from allmydata.encode import NotEnoughPeersError from allmydata.interfaces import IURI, INewDirectoryURI, \ IMutableFileURI, IUploadable, IFileURI from allmydata.filenode import LiteralFileNode +from foolscap.eventual import eventually +from foolscap.logging import log import sha #from allmydata.test.common import FakeMutableFileNode @@ -59,10 +61,35 @@ class FakeStorage: def __init__(self): self._peers = {} + # _sequence is used to cause the responses to occur in a specific + # order. If it is in use, then we will defer queries instead of + # answering them right away, accumulating the Deferreds in a dict. We + # don't know exactly how many queries we'll get, so exactly one + # second after the first query arrives, we will release them all (in + # order). + self._sequence = None + self._pending = {} def read(self, peerid, storage_index): shares = self._peers.get(peerid, {}) - return shares + if self._sequence is None: + return shares + d = defer.Deferred() + if not self._pending: + reactor.callLater(1.0, self._fire_readers) + self._pending[peerid] = (d, shares) + return d + + def _fire_readers(self): + pending = self._pending + self._pending = {} + extra = [] + for peerid in self._sequence: + if peerid in pending: + d, shares = pending.pop(peerid) + eventually(d.callback, shares) + for (d, shares) in pending.items(): + eventually(d.callback, shares) def write(self, peerid, storage_index, shnum, offset, data): if peerid not in self._peers: @@ -80,7 +107,7 @@ class FakePublish(mutable.Publish): def _do_read(self, ss, peerid, storage_index, shnums, readv): assert ss[0] == peerid assert shnums == [] - return defer.succeed(self._storage.read(peerid, storage_index)) + return defer.maybeDeferred(self._storage.read, peerid, storage_index) def _do_testreadwrite(self, peerid, secrets, tw_vectors, read_vector): @@ -432,16 +459,18 @@ class Publish(unittest.TestCase): class FakeRetrieve(mutable.Retrieve): def _do_read(self, ss, peerid, storage_index, shnums, readv): - shares = self._storage.read(peerid, storage_index) - - response = {} - for shnum in shares: - if shnums and shnum not in shnums: - continue - vector = response[shnum] = [] - for (offset, length) in readv: - vector.append(shares[shnum][offset:offset+length]) - return defer.succeed(response) + d = defer.maybeDeferred(self._storage.read, peerid, storage_index) + def _read(shares): + response = {} + for shnum in shares: + if shnums and shnum not in shnums: + continue + vector = response[shnum] = [] + for (offset, length) in readv: + vector.append(shares[shnum][offset:offset+length]) + return response + d.addCallback(_read) + return d def _deserialize_pubkey(self, pubkey_s): mo = re.search(r"^PUBKEY-(\d+)$", pubkey_s) @@ -626,3 +655,154 @@ class Roundtrip(unittest.TestCase): self.failUnlessEqual(contents, new_contents) d.addCallback(_retrieved) return d + + def test_basic_sequenced(self): + c, s, fn, p, r = self.setup_for_publish(20) + s._sequence = c._peerids[:] + contents = "New contents go here" + d = p.publish(contents) + def _published(res): + return r.retrieve() + d.addCallback(_published) + def _retrieved(new_contents): + self.failUnlessEqual(contents, new_contents) + d.addCallback(_retrieved) + return d + + def test_basic_pubkey_at_end(self): + # we corrupt the pubkey in all but the last 'k' shares, allowing the + # download to succeed but forcing a bunch of retries first. Note that + # this is rather pessimistic: our Retrieve process will throw away + # the whole share if the pubkey is bad, even though the rest of the + # share might be good. + c, s, fn, p, r = self.setup_for_publish(20) + s._sequence = c._peerids[:] + contents = "New contents go here" + d = p.publish(contents) + def _published(res): + r._pubkey = None + homes = [peerid for peerid in c._peerids + if s._peers.get(peerid, {})] + k = fn.get_required_shares() + homes_to_corrupt = homes[:-k] + for peerid in homes_to_corrupt: + shares = s._peers[peerid] + for shnum in shares: + data = shares[shnum] + (version, + seqnum, + root_hash, + IV, + k, N, segsize, datalen, + o) = mutable.unpack_header(data) + offset = 107 # pubkey + shares[shnum] = self.flip_bit(data, offset) + return r.retrieve() + d.addCallback(_published) + def _retrieved(new_contents): + self.failUnlessEqual(contents, new_contents) + d.addCallback(_retrieved) + return d + + def OFF_test_multiple_encodings(self): # not finished yet + # we encode the same file in two different ways (3-of-10 and 4-of-9), + # then mix up the shares, to make sure that download survives seeing + # a variety of encodings. This is actually kind of tricky to set up. + c, s, fn, p, r = self.setup_for_publish(20) + # we ignore fn, p, and r + + # force share retrieval to occur in this order + s._sequence = c._peerids[:] + + fn1 = FakeFilenode(c) + fn1.DEFAULT_ENCODING = (3,10) + fn1.create("") + p1 = FakePublish(fn1) + p1._storage = s + + # and we make a second filenode with the same key but different + # encoding + fn2 = FakeFilenode(c) + # init_from_uri populates _uri, _writekey, _readkey, _storage_index, + # and _fingerprint + fn2.init_from_uri(fn1.get_uri()) + # then we copy over other fields that are normally fetched from the + # existing shares + fn2._pubkey = fn1._pubkey + fn2._privkey = fn1._privkey + fn2._encprivkey = fn1._encprivkey + fn2._current_seqnum = 0 + fn2._current_roothash = "\x00" * 32 + # and set the encoding parameters to something completely different + fn2._required_shares = 4 + fn2._total_shares = 9 + + p2 = FakePublish(fn2) + p2._storage = s + + # we make a retrieval object that doesn't know what encoding + # parameters to use + fn3 = FakeFilenode(c) + fn3.init_from_uri(fn1.get_uri()) + r3 = FakeRetrieve(fn3) + r3._storage = s + + # now we upload a file through fn1, and grab its shares + contents1 = "Contents for encoding 1 (3-of-10) go here" + contents2 = "Contents for encoding 2 (4-of-9) go here" + d = p1.publish(contents1) + def _published_1(res): + self._shares1 = s._peers + s._peers = {} + # and upload it through fn2 + return p2.publish(contents2) + d.addCallback(_published_1) + def _published_2(res): + self._shares2 = s._peers + s._peers = {} + d.addCallback(_published_2) + def _merge(res): + log.msg("merging sharelists") + print len(self._shares1), len(self._shares2) + from allmydata.util import idlib + # we rearrange the shares, removing them from their original + # homes. + shares1 = self._shares1.values() + shares2 = self._shares2.values() + + print len(shares1), len(shares2) + # then we place shares in the following order: + # 4-of-9 a s2 + # 4-of-9 b s2 + # 4-of-9 c s2 + # 3-of-9 d s1 + # 3-of-9 e s1 + # 4-of-9 f s2 + # 3-of-9 g s1 + # so that neither form can be recovered until fetch [f]. Later, + # when we implement code that handles multiple versions, we can + # use this framework to assert that all recoverable versions are + # retrieved, and test that 'epsilon' does its job + places = [2, 2, 2, 1, 1, 2, 1] + for i in range(len(s._sequence)): + peerid = s._sequence[i] + if not places: + print idlib.shortnodeid_b2a(peerid), "-", "-" + break + which = places.pop(0) + if which == 1: + print idlib.shortnodeid_b2a(peerid), "1", "-" + s._peers[peerid] = shares1.pop(0) + else: + print idlib.shortnodeid_b2a(peerid), "-", "2" + s._peers[peerid] = shares2.pop(0) + # we don't bother placing any other shares + log.msg("merge done") + d.addCallback(_merge) + d.addCallback(lambda res: r3.retrieve()) + def _retrieved(new_contents): + # the current specified behavior is "first version recoverable" + self.failUnlessEqual(new_contents, contents2) + d.addCallback(_retrieved) + return d + -- 2.45.2