]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable: simplify Retrieve._process_segment() to use a gatherDeferred
authorBrian Warner <warner@lothar.com>
Sat, 7 Jan 2012 22:28:57 +0000 (14:28 -0800)
committerBrian Warner <warner@lothar.com>
Sun, 8 Jan 2012 22:12:44 +0000 (14:12 -0800)
src/allmydata/mutable/retrieve.py

index 029818dcd39172b3862a78f9bd63ee58643923bc..202e1ea868f69eb0f929520ec4dd30ddf20e8c6c 100644 (file)
@@ -8,7 +8,7 @@ from twisted.internet.interfaces import IPushProducer, IConsumer
 from foolscap.api import eventually, fireEventually
 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
      DownloadStopped, MDMF_VERSION, SDMF_VERSION
-from allmydata.util import hashutil, log, mathutil
+from allmydata.util import hashutil, log, mathutil, deferredutil
 from allmydata.util.dictutil import DictOfSets
 from allmydata import hashtree, codec
 from allmydata.storage.server import si_b2a
@@ -323,10 +323,10 @@ class Retrieve:
         self._block_hash_trees = None
         self._setup_encoding_parameters()
 
-        # _decode_blocks() expects the output of a DeferredList that contains
-        # the outputs of _validate_block() (each of which is a dict mapping
-        # shnum to (block,salt) bytestrings).
-        d = self._decode_blocks([(True, blocks_and_salts)], segnum)
+        # _decode_blocks() expects the output of a gatherResults that
+        # contains the outputs of _validate_block() (each of which is a dict
+        # mapping shnum to (block,salt) bytestrings).
+        d = self._decode_blocks([blocks_and_salts], segnum)
         d.addCallback(self._decrypt_segment)
         return d
 
@@ -636,7 +636,11 @@ class Retrieve:
             dl.addCallback(self._validate_block, segnum, reader, reader.server, started)
             dl.addErrback(self._validation_or_decoding_failed, [reader])
             ds.append(dl)
-        dl = defer.DeferredList(ds)
+        # _validation_or_decoding_failed is supposed to eat any recoverable
+        # errors (like corrupt shares), returning a None when that happens.
+        # If it raises an exception itself, or if it can't handle the error,
+        # the download should fail. So we can use gatherResults() here.
+        dl = deferredutil.gatherResults(ds)
         if self._verify:
             dl.addCallback(lambda ignored: "")
             dl.addCallback(self._set_segment)
@@ -645,35 +649,36 @@ class Retrieve:
         return dl
 
 
-    def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
+    def _maybe_decode_and_decrypt_segment(self, results, segnum):
         """
-        I take the results of fetching and validating the blocks from a
-        callback chain in another method. If the results are such that
-        they tell me that validation and fetching succeeded without
-        incident, I will proceed with decoding and decryption.
-        Otherwise, I will do nothing.
+        I take the results of fetching and validating the blocks from
+        _process_segment. If validation and fetching succeeded without
+        incident, I will proceed with decoding and decryption. Otherwise, I
+        will do nothing.
         """
         self.log("trying to decode and decrypt segment %d" % segnum)
-        failures = False
-        for block_and_salt in blocks_and_salts:
-            if not block_and_salt[0] or block_and_salt[1] == None:
-                self.log("some validation operations failed; not proceeding")
-                failures = True
-                break
-        if not failures:
-            self.log("everything looks ok, building segment %d" % segnum)
-            d = self._decode_blocks(blocks_and_salts, segnum)
-            d.addCallback(self._decrypt_segment)
-            d.addErrback(self._validation_or_decoding_failed,
-                         self._active_readers)
-            # check to see whether we've been paused before writing
-            # anything.
-            d.addCallback(self._check_for_paused)
-            d.addCallback(self._check_for_stopped)
-            d.addCallback(self._set_segment)
-            return d
-        else:
+
+        # 'results' is the output of a gatherResults set up in
+        # _process_segment(). Each component Deferred will either contain the
+        # non-Failure output of _validate_block() for a single block (i.e.
+        # {segnum:(block,salt)}), or None if _validate_block threw an
+        # exception and _validation_or_decoding_failed handled it (by
+        # dropping that server).
+
+        if None in results:
+            self.log("some validation operations failed; not proceeding")
             return defer.succeed(None)
+        self.log("everything looks ok, building segment %d" % segnum)
+        d = self._decode_blocks(results, segnum)
+        d.addCallback(self._decrypt_segment)
+        d.addErrback(self._validation_or_decoding_failed,
+                     self._active_readers)
+        # check to see whether we've been paused before writing
+        # anything.
+        d.addCallback(self._check_for_paused)
+        d.addCallback(self._check_for_stopped)
+        d.addCallback(self._set_segment)
+        return d
 
 
     def _set_segment(self, segment):
@@ -853,19 +858,16 @@ class Retrieve:
         return dl
 
 
-    def _decode_blocks(self, blocks_and_salts, segnum):
+    def _decode_blocks(self, results, segnum):
         """
         I take a list of k blocks and salts, and decode that into a
         single encrypted segment.
         """
-        d = {}
-        # We want to merge our dictionaries to the form 
-        # {shnum: blocks_and_salts}
-        #
-        # The dictionaries come from validate block that way, so we just
-        # need to merge them.
-        for block_and_salt in blocks_and_salts:
-            d.update(block_and_salt[1])
+        # 'results' is one or more dicts (each {shnum:(block,salt)}), and we
+        # want to merge them all
+        blocks_and_salts = {}
+        for d in results:
+            blocks_and_salts.update(d)
 
         # All of these blocks should have the same salt; in SDMF, it is
         # the file-wide IV, while in MDMF it is the per-segment salt. In
@@ -874,10 +876,10 @@ class Retrieve:
         # d.items()[0] is like (shnum, (block, salt))
         # d.items()[0][1] is like (block, salt)
         # d.items()[0][1][1] is the salt.
-        salt = d.items()[0][1][1]
+        salt = blocks_and_salts.items()[0][1][1]
         # Next, extract just the blocks from the dict. We'll use the
         # salt in the next step.
-        share_and_shareids = [(k, v[0]) for k, v in d.items()]
+        share_and_shareids = [(k, v[0]) for k, v in blocks_and_salts.items()]
         d2 = dict(share_and_shareids)
         shareids = []
         shares = []