# might produce a result.
return None
+ def _do_read(self, ss, peerid, storage_index, shnums, readv):
+ d = ss.callRemote("slot_readv", storage_index, shnums, readv)
+ return d
+
def _do_query(self, ss, peerid, storage_index, readsize):
started = time.time()
self._queries_outstanding.add(peerid)
- d = ss.callRemote("slot_readv", storage_index, [], [(0, readsize)])
+ d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
started)
d.addErrback(self._query_failed, peerid)
import itertools, struct
+from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python import failure, log
self.all_contents[self.get_uri()] = newdata
return defer.succeed(None)
+class FakeStorage:
+ # this class replaces the collection of storage servers, allowing the
+ # tests to examine and manipulate the published shares. It also lets us
+ # control the order in which read queries are answered, to exercise more
+ # of the error-handling code in mutable.Retrieve .
+
+ def __init__(self):
+ self._peers = {}
+
+ def read(self, peerid, storage_index):
+ shares = self._peers.get(peerid, {})
+ return shares
+
+ def write(self, peerid, storage_index, shnum, offset, data):
+ if peerid not in self._peers:
+ self._peers[peerid] = {}
+ shares = self._peers[peerid]
+ f = StringIO()
+ f.write(shares.get(shnum, ""))
+ f.seek(offset)
+ f.write(data)
+ shares[shnum] = f.getvalue()
+
+
class FakePublish(mutable.Publish):
+
def _do_read(self, ss, peerid, storage_index, shnums, readv):
assert ss[0] == peerid
assert shnums == []
- shares = self._peers[peerid]
- return defer.succeed(shares)
+ return defer.succeed(self._storage.read(peerid, storage_index))
def _do_testreadwrite(self, peerid, secrets,
tw_vectors, read_vector):
+ storage_index = self._node._uri.storage_index
# always-pass: parrot the test vectors back to them.
readv = {}
- for shnum, (testv, datav, new_length) in tw_vectors.items():
+ for shnum, (testv, writev, new_length) in tw_vectors.items():
for (offset, length, op, specimen) in testv:
assert op in ("le", "eq", "ge")
readv[shnum] = [ specimen
for (offset, length, op, specimen)
in testv ]
+ for (offset, data) in writev:
+ self._storage.write(peerid, storage_index, shnum, offset, data)
answer = (True, readv)
return defer.succeed(answer)
+
+
class FakeNewDirectoryNode(dirnode.NewDirectoryNode):
filenode_class = FakeFilenode
def setup_for_sharemap(self, num_peers):
c = FakeClient(num_peers)
fn = FakeFilenode(c)
+ s = FakeStorage()
# .create usually returns a Deferred, but we happen to know it's
# synchronous
CONTENTS = "some initial contents"
p._new_seqnum = 3
p._read_size = 1000
#r = mutable.Retrieve(fn)
- p._peers = {}
- for peerid in c._peerids:
- p._peers[peerid] = {}
+ p._storage = s
return c, p
def shouldFail(self, expected_failure, which, call, *args, **kwargs):
d.addCallback(_done)
return d
+class FakeRetrieve(mutable.Retrieve):
+ def _do_read(self, ss, peerid, storage_index, shnums, readv):
+ shares = self._storage.read(peerid, storage_index)
+
+ response = {}
+ for shnum in shares:
+ if shnums and shnum not in shnums:
+ continue
+ vector = response[shnum] = []
+ for (offset, length) in readv:
+ vector.append(shares[shnum][offset:offset+length])
+ return defer.succeed(response)
+
+class Roundtrip(unittest.TestCase):
+
def setup_for_publish(self, num_peers):
c = FakeClient(num_peers)
fn = FakeFilenode(c)
+ s = FakeStorage()
# .create usually returns a Deferred, but we happen to know it's
# synchronous
fn.create("")
p = FakePublish(fn)
- p._peers = {}
- for peerid in c._peerids:
- p._peers[peerid] = {}
+ p._storage = s
return c, fn, p
- def test_publish(self):
+ def test_basic(self):
c, fn, p = self.setup_for_publish(20)
- # make sure the length of our contents string is not a multiple of k,
- # to exercise the padding code.
- d = p.publish("New contents of the mutable filenode.")
- def _done(res):
+ contents = "New contents go here"
+ d = p.publish(contents)
+ def _published(res):
# TODO: examine peers and check on their shares
- pass
- d.addCallback(_done)
+ r = FakeRetrieve(fn)
+ r._storage = p._storage
+ return r.retrieve()
+ d.addCallback(_published)
+ def _retrieved(new_contents):
+ self.failUnlessEqual(contents, new_contents)
+ d.addCallback(_retrieved)
return d
+