From 2b4f2b7fa3b8f6e0d304453dfe7ae917663df965 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Mon, 5 Sep 2011 12:04:08 -0700
Subject: [PATCH] MDMFSlotReadProxy: remove the queue

This is a neat trick to reduce Foolscap overhead, but the need for an
explicit flush() complicates the Retrieve path and makes it prone to
lost-progress bugs.

Also change test_mutable.FakeStorageServer to tolerate multiple reads of the
same share in a row, a limitation exposed by turning off the queue.
---
 src/allmydata/mutable/layout.py    | 65 +++++++-----------------------
 src/allmydata/mutable/retrieve.py  |  7 ++--
 src/allmydata/mutable/servermap.py | 15 +++----
 src/allmydata/test/test_mutable.py | 11 +++--
 src/allmydata/test/test_storage.py | 36 -----------------
 5 files changed, 30 insertions(+), 104 deletions(-)

diff --git a/src/allmydata/mutable/layout.py b/src/allmydata/mutable/layout.py
index e2fa6d68..55f39685 100644
--- a/src/allmydata/mutable/layout.py
+++ b/src/allmydata/mutable/layout.py
@@ -3,7 +3,7 @@ import struct
 from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError
 from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \
                                  MDMF_VERSION, IMutableSlotWriter
-from allmydata.util import mathutil, observer
+from allmydata.util import mathutil
 from twisted.python import failure
 from twisted.internet import defer
 from zope.interface import implements
@@ -1212,10 +1212,6 @@ class MDMFSlotReadProxy:
         if self._data == None:
             self._data = ""
 
-        self._queue_observers = observer.ObserverList()
-        self._queue_errbacks = observer.ObserverList()
-        self._readvs = []
-
 
     def _maybe_fetch_offsets_and_header(self, force_remote=False):
         """
@@ -1353,7 +1349,7 @@ class MDMFSlotReadProxy:
             self._offsets['share_data'] = sharedata
 
 
-    def get_block_and_salt(self, segnum, queue=False):
+    def get_block_and_salt(self, segnum):
         """
         I return (block, salt), where block is the block data and
         salt is the salt used to encrypt that segment.
@@ -1381,8 +1377,7 @@ class MDMFSlotReadProxy:
             readvs = [(share_offset, data)]
             return readvs
         d.addCallback(_then)
-        d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue))
+        d.addCallback(lambda readvs: self._read(readvs))
         def _process_results(results):
             assert self.shnum in results
             if self._version_number == 0:
@@ -1408,7 +1403,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def get_blockhashes(self, needed=None, queue=False, force_remote=False):
+    def get_blockhashes(self, needed=None, force_remote=False):
         """
         I return the block hash tree
 
@@ -1440,7 +1435,7 @@ class MDMFSlotReadProxy:
             return readvs
         d.addCallback(_then)
         d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue, force_remote=force_remote))
+            self._read(readvs, force_remote=force_remote))
         def _build_block_hash_tree(results):
             assert self.shnum in results
 
@@ -1452,7 +1447,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def get_sharehashes(self, needed=None, queue=False, force_remote=False):
+    def get_sharehashes(self, needed=None, force_remote=False):
         """
         I return the part of the share hash chain placed to validate
         this share.
@@ -1479,7 +1474,7 @@ class MDMFSlotReadProxy:
             return readvs
         d.addCallback(_make_readvs)
         d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue, force_remote=force_remote))
+            self._read(readvs, force_remote=force_remote))
         def _build_share_hash_chain(results):
             assert self.shnum in results
 
@@ -1493,7 +1488,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def get_encprivkey(self, queue=False):
+    def get_encprivkey(self):
         """
         I return the encrypted private key.
         """
@@ -1508,8 +1503,7 @@ class MDMFSlotReadProxy:
             readvs = [(privkey_offset, privkey_length)]
             return readvs
         d.addCallback(_make_readvs)
-        d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue))
+        d.addCallback(lambda readvs: self._read(readvs))
         def _process_results(results):
             assert self.shnum in results
             privkey = results[self.shnum][0]
@@ -1518,7 +1512,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def get_signature(self, queue=False):
+    def get_signature(self):
         """
         I return the signature of my share.
         """
@@ -1533,8 +1527,7 @@ class MDMFSlotReadProxy:
             readvs = [(signature_offset, signature_length)]
             return readvs
         d.addCallback(_make_readvs)
-        d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue))
+        d.addCallback(lambda readvs: self._read(readvs))
         def _process_results(results):
             assert self.shnum in results
             signature = results[self.shnum][0]
@@ -1543,7 +1536,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def get_verification_key(self, queue=False):
+    def get_verification_key(self):
         """
         I return the verification key.
         """
@@ -1559,8 +1552,7 @@ class MDMFSlotReadProxy:
             readvs = [(vk_offset, vk_length)]
             return readvs
         d.addCallback(_make_readvs)
-        d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue))
+        d.addCallback(lambda readvs: self._read(readvs))
         def _process_results(results):
             assert self.shnum in results
             verification_key = results[self.shnum][0]
@@ -1712,23 +1704,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def flush(self):
-        """
-        I flush my queue of read vectors.
-        """
-        d = self._read(self._readvs)
-        def _then(results):
-            self._readvs = []
-            if isinstance(results, failure.Failure):
-                self._queue_errbacks.notify(results)
-            else:
-                self._queue_observers.notify(results)
-            self._queue_observers = observer.ObserverList()
-            self._queue_errbacks = observer.ObserverList()
-        d.addBoth(_then)
-
-
-    def _read(self, readvs, force_remote=False, queue=False):
+    def _read(self, readvs, force_remote=False):
         unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs)
         # TODO: It's entirely possible to tweak this so that it just
         # fulfills the requests that it can, and not demand that all
@@ -1739,19 +1715,6 @@ class MDMFSlotReadProxy:
             results = {self.shnum: results}
             return defer.succeed(results)
         else:
-            if queue:
-                start = len(self._readvs)
-                self._readvs += readvs
-                end = len(self._readvs)
-                def _get_results(results, start, end):
-                    if not self.shnum in results:
-                        return {self._shnum: [""]}
-                    return {self.shnum: results[self.shnum][start:end]}
-                d = defer.Deferred()
-                d.addCallback(_get_results, start, end)
-                self._queue_observers.subscribe(d.callback)
-                self._queue_errbacks.subscribe(d.errback)
-                return d
             return self._rref.callRemote("slot_readv",
                                          self._storage_index,
                                          [self.shnum],
diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py
index 25930c8e..8d35707c 100644
--- a/src/allmydata/mutable/retrieve.py
+++ b/src/allmydata/mutable/retrieve.py
@@ -700,13 +700,12 @@ class Retrieve:
         ds = []
         for reader in self._active_readers:
             started = time.time()
-            d = reader.get_block_and_salt(segnum, queue=True)
+            d = reader.get_block_and_salt(segnum)
             d2 = self._get_needed_hashes(reader, segnum)
             dl = defer.DeferredList([d, d2], consumeErrors=True)
             dl.addCallback(self._validate_block, segnum, reader, started)
             dl.addErrback(self._validation_or_decoding_failed, [reader])
             ds.append(dl)
-            reader.flush()
         dl = defer.DeferredList(ds)
         if self._verify:
             dl.addCallback(lambda ignored: "")
@@ -910,12 +909,12 @@ class Retrieve:
         #needed.discard(0)
         self.log("getting blockhashes for segment %d, share %d: %s" % \
                  (segnum, reader.shnum, str(needed)))
-        d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
+        d1 = reader.get_blockhashes(needed, force_remote=True)
         if self.share_hash_tree.needed_hashes(reader.shnum):
             need = self.share_hash_tree.needed_hashes(reader.shnum)
             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
                                                                  str(need)))
-            d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
+            d2 = reader.get_sharehashes(need, force_remote=True)
         else:
             d2 = defer.succeed({}) # the logic in the next method
                                    # expects a dict
diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py
index cb93fc5d..18907141 100644
--- a/src/allmydata/mutable/servermap.py
+++ b/src/allmydata/mutable/servermap.py
@@ -676,7 +676,7 @@ class ServermapUpdater:
             #     public key. We use this to validate the signature.
             if not self._node.get_pubkey():
                 # fetch and set the public key.
-                d = reader.get_verification_key(queue=True)
+                d = reader.get_verification_key()
                 d.addCallback(lambda results, shnum=shnum, peerid=peerid:
                     self._try_to_set_pubkey(results, peerid, shnum, lp))
                 # XXX: Make self._pubkey_query_failed?
@@ -702,7 +702,7 @@ class ServermapUpdater:
             #   to get the version information. In MDMF, this lives at
             #   the end of the share, so unless the file is quite small,
             #   we'll need to do a remote fetch to get it.
-            d3 = reader.get_signature(queue=True)
+            d3 = reader.get_signature()
             d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
                 self._got_corrupt_share(error, shnum, peerid, data, lp))
             #  Once we have all three of these responses, we can move on
@@ -711,7 +711,7 @@ class ServermapUpdater:
             # Does the node already have a privkey? If not, we'll try to
             # fetch it here.
             if self._need_privkey:
-                d4 = reader.get_encprivkey(queue=True)
+                d4 = reader.get_encprivkey()
                 d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
                     self._try_to_validate_privkey(results, peerid, shnum, lp))
                 d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
@@ -730,11 +730,9 @@ class ServermapUpdater:
                 # make the two routines share the value without
                 # introducing more roundtrips?
                 ds.append(reader.get_verinfo())
-                ds.append(reader.get_blockhashes(queue=True))
-                ds.append(reader.get_block_and_salt(self.start_segment,
-                                                    queue=True))
-                ds.append(reader.get_block_and_salt(self.end_segment,
-                                                    queue=True))
+                ds.append(reader.get_blockhashes())
+                ds.append(reader.get_block_and_salt(self.start_segment))
+                ds.append(reader.get_block_and_salt(self.end_segment))
                 d5 = deferredutil.gatherResults(ds)
                 d5.addCallback(self._got_update_results_one_share, shnum)
             else:
@@ -742,7 +740,6 @@ class ServermapUpdater:
 
             dl = defer.DeferredList([d, d2, d3, d4, d5])
             dl.addBoth(self._turn_barrier)
-            reader.flush()
             dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
                 self._got_signature_one_share(results, shnum, peerid, lp))
             dl.addErrback(lambda error, shnum=shnum, data=data:
diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py
index 7bda5069..14b62ee2 100644
--- a/src/allmydata/test/test_mutable.py
+++ b/src/allmydata/test/test_mutable.py
@@ -72,7 +72,9 @@ class FakeStorage:
         d = defer.Deferred()
         if not self._pending:
             self._pending_timer = reactor.callLater(1.0, self._fire_readers)
-        self._pending[peerid] = (d, shares)
+        if peerid not in self._pending:
+            self._pending[peerid] = []
+        self._pending[peerid].append( (d, shares) )
         return d
 
     def _fire_readers(self):
@@ -81,10 +83,11 @@ class FakeStorage:
         self._pending = {}
         for peerid in self._sequence:
             if peerid in pending:
-                d, shares = pending.pop(peerid)
+                for (d, shares) in pending.pop(peerid):
+                    eventually(d.callback, shares)
+        for peerid in pending:
+            for (d, shares) in pending[peerid]:
                 eventually(d.callback, shares)
-        for (d, shares) in pending.values():
-            eventually(d.callback, shares)
 
     def write(self, peerid, storage_index, shnum, offset, data):
         if peerid not in self._peers:
diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py
index 549b839f..2765a913 100644
--- a/src/allmydata/test/test_storage.py
+++ b/src/allmydata/test/test_storage.py
@@ -2624,42 +2624,6 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
         return d
 
 
-    def test_reader_queue(self):
-        self.write_test_share_to_server('si1')
-        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
-        d1 = mr.get_block_and_salt(0, queue=True)
-        d2 = mr.get_blockhashes(queue=True)
-        d3 = mr.get_sharehashes(queue=True)
-        d4 = mr.get_signature(queue=True)
-        d5 = mr.get_verification_key(queue=True)
-        dl = defer.DeferredList([d1, d2, d3, d4, d5])
-        mr.flush()
-        def _print(results):
-            self.failUnlessEqual(len(results), 5)
-            # We have one read for version information and offsets, and
-            # one for everything else.
-            self.failUnlessEqual(self.rref.read_count, 2)
-            block, salt = results[0][1] # results[0] is a boolean that says
-                                           # whether or not the operation
-                                           # worked.
-            self.failUnlessEqual(self.block, block)
-            self.failUnlessEqual(self.salt, salt)
-
-            blockhashes = results[1][1]
-            self.failUnlessEqual(self.block_hash_tree, blockhashes)
-
-            sharehashes = results[2][1]
-            self.failUnlessEqual(self.share_hash_chain, sharehashes)
-
-            signature = results[3][1]
-            self.failUnlessEqual(self.signature, signature)
-
-            verification_key = results[4][1]
-            self.failUnlessEqual(self.verification_key, verification_key)
-        dl.addCallback(_print)
-        return dl
-
-
     def test_sdmf_writer(self):
         # Go through the motions of writing an SDMF share to the storage
         # server. Then read the storage server to see that the share got
-- 
2.45.2