From: Brian Warner Date: Mon, 5 Sep 2011 19:04:08 +0000 (-0700) Subject: MDMFSlotReadProxy: remove the queue X-Git-Tag: allmydata-tahoe-1.9.0a2~56 X-Git-Url: https://git.rkrishnan.org/components/%22news.html/simplejson/nxhtml.html?a=commitdiff_plain;h=2b4f2b7fa3b8f6e0d304453dfe7ae917663df965;p=tahoe-lafs%2Ftahoe-lafs.git MDMFSlotReadProxy: remove the queue This is a neat trick to reduce Foolscap overhead, but the need for an explicit flush() complicates the Retrieve path and makes it prone to lost-progress bugs. Also change test_mutable.FakeStorageServer to tolerate multiple reads of the same share in a row, a limitation exposed by turning off the queue. --- diff --git a/src/allmydata/mutable/layout.py b/src/allmydata/mutable/layout.py index e2fa6d68..55f39685 100644 --- a/src/allmydata/mutable/layout.py +++ b/src/allmydata/mutable/layout.py @@ -3,7 +3,7 @@ import struct from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \ MDMF_VERSION, IMutableSlotWriter -from allmydata.util import mathutil, observer +from allmydata.util import mathutil from twisted.python import failure from twisted.internet import defer from zope.interface import implements @@ -1212,10 +1212,6 @@ class MDMFSlotReadProxy: if self._data == None: self._data = "" - self._queue_observers = observer.ObserverList() - self._queue_errbacks = observer.ObserverList() - self._readvs = [] - def _maybe_fetch_offsets_and_header(self, force_remote=False): """ @@ -1353,7 +1349,7 @@ class MDMFSlotReadProxy: self._offsets['share_data'] = sharedata - def get_block_and_salt(self, segnum, queue=False): + def get_block_and_salt(self, segnum): """ I return (block, salt), where block is the block data and salt is the salt used to encrypt that segment. @@ -1381,8 +1377,7 @@ class MDMFSlotReadProxy: readvs = [(share_offset, data)] return readvs d.addCallback(_then) - d.addCallback(lambda readvs: - self._read(readvs, queue=queue)) + d.addCallback(lambda readvs: self._read(readvs)) def _process_results(results): assert self.shnum in results if self._version_number == 0: @@ -1408,7 +1403,7 @@ class MDMFSlotReadProxy: return d - def get_blockhashes(self, needed=None, queue=False, force_remote=False): + def get_blockhashes(self, needed=None, force_remote=False): """ I return the block hash tree @@ -1440,7 +1435,7 @@ class MDMFSlotReadProxy: return readvs d.addCallback(_then) d.addCallback(lambda readvs: - self._read(readvs, queue=queue, force_remote=force_remote)) + self._read(readvs, force_remote=force_remote)) def _build_block_hash_tree(results): assert self.shnum in results @@ -1452,7 +1447,7 @@ class MDMFSlotReadProxy: return d - def get_sharehashes(self, needed=None, queue=False, force_remote=False): + def get_sharehashes(self, needed=None, force_remote=False): """ I return the part of the share hash chain placed to validate this share. @@ -1479,7 +1474,7 @@ class MDMFSlotReadProxy: return readvs d.addCallback(_make_readvs) d.addCallback(lambda readvs: - self._read(readvs, queue=queue, force_remote=force_remote)) + self._read(readvs, force_remote=force_remote)) def _build_share_hash_chain(results): assert self.shnum in results @@ -1493,7 +1488,7 @@ class MDMFSlotReadProxy: return d - def get_encprivkey(self, queue=False): + def get_encprivkey(self): """ I return the encrypted private key. """ @@ -1508,8 +1503,7 @@ class MDMFSlotReadProxy: readvs = [(privkey_offset, privkey_length)] return readvs d.addCallback(_make_readvs) - d.addCallback(lambda readvs: - self._read(readvs, queue=queue)) + d.addCallback(lambda readvs: self._read(readvs)) def _process_results(results): assert self.shnum in results privkey = results[self.shnum][0] @@ -1518,7 +1512,7 @@ class MDMFSlotReadProxy: return d - def get_signature(self, queue=False): + def get_signature(self): """ I return the signature of my share. """ @@ -1533,8 +1527,7 @@ class MDMFSlotReadProxy: readvs = [(signature_offset, signature_length)] return readvs d.addCallback(_make_readvs) - d.addCallback(lambda readvs: - self._read(readvs, queue=queue)) + d.addCallback(lambda readvs: self._read(readvs)) def _process_results(results): assert self.shnum in results signature = results[self.shnum][0] @@ -1543,7 +1536,7 @@ class MDMFSlotReadProxy: return d - def get_verification_key(self, queue=False): + def get_verification_key(self): """ I return the verification key. """ @@ -1559,8 +1552,7 @@ class MDMFSlotReadProxy: readvs = [(vk_offset, vk_length)] return readvs d.addCallback(_make_readvs) - d.addCallback(lambda readvs: - self._read(readvs, queue=queue)) + d.addCallback(lambda readvs: self._read(readvs)) def _process_results(results): assert self.shnum in results verification_key = results[self.shnum][0] @@ -1712,23 +1704,7 @@ class MDMFSlotReadProxy: return d - def flush(self): - """ - I flush my queue of read vectors. - """ - d = self._read(self._readvs) - def _then(results): - self._readvs = [] - if isinstance(results, failure.Failure): - self._queue_errbacks.notify(results) - else: - self._queue_observers.notify(results) - self._queue_observers = observer.ObserverList() - self._queue_errbacks = observer.ObserverList() - d.addBoth(_then) - - - def _read(self, readvs, force_remote=False, queue=False): + def _read(self, readvs, force_remote=False): unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs) # TODO: It's entirely possible to tweak this so that it just # fulfills the requests that it can, and not demand that all @@ -1739,19 +1715,6 @@ class MDMFSlotReadProxy: results = {self.shnum: results} return defer.succeed(results) else: - if queue: - start = len(self._readvs) - self._readvs += readvs - end = len(self._readvs) - def _get_results(results, start, end): - if not self.shnum in results: - return {self._shnum: [""]} - return {self.shnum: results[self.shnum][start:end]} - d = defer.Deferred() - d.addCallback(_get_results, start, end) - self._queue_observers.subscribe(d.callback) - self._queue_errbacks.subscribe(d.errback) - return d return self._rref.callRemote("slot_readv", self._storage_index, [self.shnum], diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 25930c8e..8d35707c 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -700,13 +700,12 @@ class Retrieve: ds = [] for reader in self._active_readers: started = time.time() - d = reader.get_block_and_salt(segnum, queue=True) + d = reader.get_block_and_salt(segnum) d2 = self._get_needed_hashes(reader, segnum) dl = defer.DeferredList([d, d2], consumeErrors=True) dl.addCallback(self._validate_block, segnum, reader, started) dl.addErrback(self._validation_or_decoding_failed, [reader]) ds.append(dl) - reader.flush() dl = defer.DeferredList(ds) if self._verify: dl.addCallback(lambda ignored: "") @@ -910,12 +909,12 @@ class Retrieve: #needed.discard(0) self.log("getting blockhashes for segment %d, share %d: %s" % \ (segnum, reader.shnum, str(needed))) - d1 = reader.get_blockhashes(needed, queue=True, force_remote=True) + d1 = reader.get_blockhashes(needed, force_remote=True) if self.share_hash_tree.needed_hashes(reader.shnum): need = self.share_hash_tree.needed_hashes(reader.shnum) self.log("also need sharehashes for share %d: %s" % (reader.shnum, str(need))) - d2 = reader.get_sharehashes(need, queue=True, force_remote=True) + d2 = reader.get_sharehashes(need, force_remote=True) else: d2 = defer.succeed({}) # the logic in the next method # expects a dict diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index cb93fc5d..18907141 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -676,7 +676,7 @@ class ServermapUpdater: # public key. We use this to validate the signature. if not self._node.get_pubkey(): # fetch and set the public key. - d = reader.get_verification_key(queue=True) + d = reader.get_verification_key() d.addCallback(lambda results, shnum=shnum, peerid=peerid: self._try_to_set_pubkey(results, peerid, shnum, lp)) # XXX: Make self._pubkey_query_failed? @@ -702,7 +702,7 @@ class ServermapUpdater: # to get the version information. In MDMF, this lives at # the end of the share, so unless the file is quite small, # we'll need to do a remote fetch to get it. - d3 = reader.get_signature(queue=True) + d3 = reader.get_signature() d3.addErrback(lambda error, shnum=shnum, peerid=peerid: self._got_corrupt_share(error, shnum, peerid, data, lp)) # Once we have all three of these responses, we can move on @@ -711,7 +711,7 @@ class ServermapUpdater: # Does the node already have a privkey? If not, we'll try to # fetch it here. if self._need_privkey: - d4 = reader.get_encprivkey(queue=True) + d4 = reader.get_encprivkey() d4.addCallback(lambda results, shnum=shnum, peerid=peerid: self._try_to_validate_privkey(results, peerid, shnum, lp)) d4.addErrback(lambda error, shnum=shnum, peerid=peerid: @@ -730,11 +730,9 @@ class ServermapUpdater: # make the two routines share the value without # introducing more roundtrips? ds.append(reader.get_verinfo()) - ds.append(reader.get_blockhashes(queue=True)) - ds.append(reader.get_block_and_salt(self.start_segment, - queue=True)) - ds.append(reader.get_block_and_salt(self.end_segment, - queue=True)) + ds.append(reader.get_blockhashes()) + ds.append(reader.get_block_and_salt(self.start_segment)) + ds.append(reader.get_block_and_salt(self.end_segment)) d5 = deferredutil.gatherResults(ds) d5.addCallback(self._got_update_results_one_share, shnum) else: @@ -742,7 +740,6 @@ class ServermapUpdater: dl = defer.DeferredList([d, d2, d3, d4, d5]) dl.addBoth(self._turn_barrier) - reader.flush() dl.addCallback(lambda results, shnum=shnum, peerid=peerid: self._got_signature_one_share(results, shnum, peerid, lp)) dl.addErrback(lambda error, shnum=shnum, data=data: diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 7bda5069..14b62ee2 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -72,7 +72,9 @@ class FakeStorage: d = defer.Deferred() if not self._pending: self._pending_timer = reactor.callLater(1.0, self._fire_readers) - self._pending[peerid] = (d, shares) + if peerid not in self._pending: + self._pending[peerid] = [] + self._pending[peerid].append( (d, shares) ) return d def _fire_readers(self): @@ -81,10 +83,11 @@ class FakeStorage: self._pending = {} for peerid in self._sequence: if peerid in pending: - d, shares = pending.pop(peerid) + for (d, shares) in pending.pop(peerid): + eventually(d.callback, shares) + for peerid in pending: + for (d, shares) in pending[peerid]: eventually(d.callback, shares) - for (d, shares) in pending.values(): - eventually(d.callback, shares) def write(self, peerid, storage_index, shnum, offset, data): if peerid not in self._peers: diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 549b839f..2765a913 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -2624,42 +2624,6 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): return d - def test_reader_queue(self): - self.write_test_share_to_server('si1') - mr = MDMFSlotReadProxy(self.rref, "si1", 0) - d1 = mr.get_block_and_salt(0, queue=True) - d2 = mr.get_blockhashes(queue=True) - d3 = mr.get_sharehashes(queue=True) - d4 = mr.get_signature(queue=True) - d5 = mr.get_verification_key(queue=True) - dl = defer.DeferredList([d1, d2, d3, d4, d5]) - mr.flush() - def _print(results): - self.failUnlessEqual(len(results), 5) - # We have one read for version information and offsets, and - # one for everything else. - self.failUnlessEqual(self.rref.read_count, 2) - block, salt = results[0][1] # results[0] is a boolean that says - # whether or not the operation - # worked. - self.failUnlessEqual(self.block, block) - self.failUnlessEqual(self.salt, salt) - - blockhashes = results[1][1] - self.failUnlessEqual(self.block_hash_tree, blockhashes) - - sharehashes = results[2][1] - self.failUnlessEqual(self.share_hash_chain, sharehashes) - - signature = results[3][1] - self.failUnlessEqual(self.signature, signature) - - verification_key = results[4][1] - self.failUnlessEqual(self.verification_key, verification_key) - dl.addCallback(_print) - return dl - - def test_sdmf_writer(self): # Go through the motions of writing an SDMF share to the storage # server. Then read the storage server to see that the share got