]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/layout.py
MDMFSlotReadProxy: remove the queue
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / layout.py
index e2fa6d68d585c9e50980a58c2b989c6954a03d65..55f3968521e3fb1be5191f35fc0dcfa8f9277d6a 100644 (file)
@@ -3,7 +3,7 @@ import struct
 from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError
 from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \
                                  MDMF_VERSION, IMutableSlotWriter
-from allmydata.util import mathutil, observer
+from allmydata.util import mathutil
 from twisted.python import failure
 from twisted.internet import defer
 from zope.interface import implements
@@ -1212,10 +1212,6 @@ class MDMFSlotReadProxy:
         if self._data == None:
             self._data = ""
 
-        self._queue_observers = observer.ObserverList()
-        self._queue_errbacks = observer.ObserverList()
-        self._readvs = []
-
 
     def _maybe_fetch_offsets_and_header(self, force_remote=False):
         """
@@ -1353,7 +1349,7 @@ class MDMFSlotReadProxy:
             self._offsets['share_data'] = sharedata
 
 
-    def get_block_and_salt(self, segnum, queue=False):
+    def get_block_and_salt(self, segnum):
         """
         I return (block, salt), where block is the block data and
         salt is the salt used to encrypt that segment.
@@ -1381,8 +1377,7 @@ class MDMFSlotReadProxy:
             readvs = [(share_offset, data)]
             return readvs
         d.addCallback(_then)
-        d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue))
+        d.addCallback(lambda readvs: self._read(readvs))
         def _process_results(results):
             assert self.shnum in results
             if self._version_number == 0:
@@ -1408,7 +1403,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def get_blockhashes(self, needed=None, queue=False, force_remote=False):
+    def get_blockhashes(self, needed=None, force_remote=False):
         """
         I return the block hash tree
 
@@ -1440,7 +1435,7 @@ class MDMFSlotReadProxy:
             return readvs
         d.addCallback(_then)
         d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue, force_remote=force_remote))
+            self._read(readvs, force_remote=force_remote))
         def _build_block_hash_tree(results):
             assert self.shnum in results
 
@@ -1452,7 +1447,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def get_sharehashes(self, needed=None, queue=False, force_remote=False):
+    def get_sharehashes(self, needed=None, force_remote=False):
         """
         I return the part of the share hash chain placed to validate
         this share.
@@ -1479,7 +1474,7 @@ class MDMFSlotReadProxy:
             return readvs
         d.addCallback(_make_readvs)
         d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue, force_remote=force_remote))
+            self._read(readvs, force_remote=force_remote))
         def _build_share_hash_chain(results):
             assert self.shnum in results
 
@@ -1493,7 +1488,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def get_encprivkey(self, queue=False):
+    def get_encprivkey(self):
         """
         I return the encrypted private key.
         """
@@ -1508,8 +1503,7 @@ class MDMFSlotReadProxy:
             readvs = [(privkey_offset, privkey_length)]
             return readvs
         d.addCallback(_make_readvs)
-        d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue))
+        d.addCallback(lambda readvs: self._read(readvs))
         def _process_results(results):
             assert self.shnum in results
             privkey = results[self.shnum][0]
@@ -1518,7 +1512,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def get_signature(self, queue=False):
+    def get_signature(self):
         """
         I return the signature of my share.
         """
@@ -1533,8 +1527,7 @@ class MDMFSlotReadProxy:
             readvs = [(signature_offset, signature_length)]
             return readvs
         d.addCallback(_make_readvs)
-        d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue))
+        d.addCallback(lambda readvs: self._read(readvs))
         def _process_results(results):
             assert self.shnum in results
             signature = results[self.shnum][0]
@@ -1543,7 +1536,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def get_verification_key(self, queue=False):
+    def get_verification_key(self):
         """
         I return the verification key.
         """
@@ -1559,8 +1552,7 @@ class MDMFSlotReadProxy:
             readvs = [(vk_offset, vk_length)]
             return readvs
         d.addCallback(_make_readvs)
-        d.addCallback(lambda readvs:
-            self._read(readvs, queue=queue))
+        d.addCallback(lambda readvs: self._read(readvs))
         def _process_results(results):
             assert self.shnum in results
             verification_key = results[self.shnum][0]
@@ -1712,23 +1704,7 @@ class MDMFSlotReadProxy:
         return d
 
 
-    def flush(self):
-        """
-        I flush my queue of read vectors.
-        """
-        d = self._read(self._readvs)
-        def _then(results):
-            self._readvs = []
-            if isinstance(results, failure.Failure):
-                self._queue_errbacks.notify(results)
-            else:
-                self._queue_observers.notify(results)
-            self._queue_observers = observer.ObserverList()
-            self._queue_errbacks = observer.ObserverList()
-        d.addBoth(_then)
-
-
-    def _read(self, readvs, force_remote=False, queue=False):
+    def _read(self, readvs, force_remote=False):
         unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs)
         # TODO: It's entirely possible to tweak this so that it just
         # fulfills the requests that it can, and not demand that all
@@ -1739,19 +1715,6 @@ class MDMFSlotReadProxy:
             results = {self.shnum: results}
             return defer.succeed(results)
         else:
-            if queue:
-                start = len(self._readvs)
-                self._readvs += readvs
-                end = len(self._readvs)
-                def _get_results(results, start, end):
-                    if not self.shnum in results:
-                        return {self._shnum: [""]}
-                    return {self.shnum: results[self.shnum][start:end]}
-                d = defer.Deferred()
-                d.addCallback(_get_results, start, end)
-                self._queue_observers.subscribe(d.callback)
-                self._queue_errbacks.subscribe(d.errback)
-                return d
             return self._rref.callRemote("slot_readv",
                                          self._storage_index,
                                          [self.shnum],