From: Brian Warner Date: Sat, 7 Jan 2012 22:28:57 +0000 (-0800) Subject: mutable: simplify Retrieve._process_segment() to use a gatherDeferred X-Git-Url: https://git.rkrishnan.org/listings/simplejson/encoder.py.html?a=commitdiff_plain;h=f1752f54c0622332636a56ea0c721d5fb60dbc57;p=tahoe-lafs%2Ftahoe-lafs.git mutable: simplify Retrieve._process_segment() to use a gatherDeferred --- diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 029818dc..202e1ea8 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -8,7 +8,7 @@ from twisted.internet.interfaces import IPushProducer, IConsumer from foolscap.api import eventually, fireEventually from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \ DownloadStopped, MDMF_VERSION, SDMF_VERSION -from allmydata.util import hashutil, log, mathutil +from allmydata.util import hashutil, log, mathutil, deferredutil from allmydata.util.dictutil import DictOfSets from allmydata import hashtree, codec from allmydata.storage.server import si_b2a @@ -323,10 +323,10 @@ class Retrieve: self._block_hash_trees = None self._setup_encoding_parameters() - # _decode_blocks() expects the output of a DeferredList that contains - # the outputs of _validate_block() (each of which is a dict mapping - # shnum to (block,salt) bytestrings). - d = self._decode_blocks([(True, blocks_and_salts)], segnum) + # _decode_blocks() expects the output of a gatherResults that + # contains the outputs of _validate_block() (each of which is a dict + # mapping shnum to (block,salt) bytestrings). + d = self._decode_blocks([blocks_and_salts], segnum) d.addCallback(self._decrypt_segment) return d @@ -636,7 +636,11 @@ class Retrieve: dl.addCallback(self._validate_block, segnum, reader, reader.server, started) dl.addErrback(self._validation_or_decoding_failed, [reader]) ds.append(dl) - dl = defer.DeferredList(ds) + # _validation_or_decoding_failed is supposed to eat any recoverable + # errors (like corrupt shares), returning a None when that happens. + # If it raises an exception itself, or if it can't handle the error, + # the download should fail. So we can use gatherResults() here. + dl = deferredutil.gatherResults(ds) if self._verify: dl.addCallback(lambda ignored: "") dl.addCallback(self._set_segment) @@ -645,35 +649,36 @@ class Retrieve: return dl - def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum): + def _maybe_decode_and_decrypt_segment(self, results, segnum): """ - I take the results of fetching and validating the blocks from a - callback chain in another method. If the results are such that - they tell me that validation and fetching succeeded without - incident, I will proceed with decoding and decryption. - Otherwise, I will do nothing. + I take the results of fetching and validating the blocks from + _process_segment. If validation and fetching succeeded without + incident, I will proceed with decoding and decryption. Otherwise, I + will do nothing. """ self.log("trying to decode and decrypt segment %d" % segnum) - failures = False - for block_and_salt in blocks_and_salts: - if not block_and_salt[0] or block_and_salt[1] == None: - self.log("some validation operations failed; not proceeding") - failures = True - break - if not failures: - self.log("everything looks ok, building segment %d" % segnum) - d = self._decode_blocks(blocks_and_salts, segnum) - d.addCallback(self._decrypt_segment) - d.addErrback(self._validation_or_decoding_failed, - self._active_readers) - # check to see whether we've been paused before writing - # anything. - d.addCallback(self._check_for_paused) - d.addCallback(self._check_for_stopped) - d.addCallback(self._set_segment) - return d - else: + + # 'results' is the output of a gatherResults set up in + # _process_segment(). Each component Deferred will either contain the + # non-Failure output of _validate_block() for a single block (i.e. + # {segnum:(block,salt)}), or None if _validate_block threw an + # exception and _validation_or_decoding_failed handled it (by + # dropping that server). + + if None in results: + self.log("some validation operations failed; not proceeding") return defer.succeed(None) + self.log("everything looks ok, building segment %d" % segnum) + d = self._decode_blocks(results, segnum) + d.addCallback(self._decrypt_segment) + d.addErrback(self._validation_or_decoding_failed, + self._active_readers) + # check to see whether we've been paused before writing + # anything. + d.addCallback(self._check_for_paused) + d.addCallback(self._check_for_stopped) + d.addCallback(self._set_segment) + return d def _set_segment(self, segment): @@ -853,19 +858,16 @@ class Retrieve: return dl - def _decode_blocks(self, blocks_and_salts, segnum): + def _decode_blocks(self, results, segnum): """ I take a list of k blocks and salts, and decode that into a single encrypted segment. """ - d = {} - # We want to merge our dictionaries to the form - # {shnum: blocks_and_salts} - # - # The dictionaries come from validate block that way, so we just - # need to merge them. - for block_and_salt in blocks_and_salts: - d.update(block_and_salt[1]) + # 'results' is one or more dicts (each {shnum:(block,salt)}), and we + # want to merge them all + blocks_and_salts = {} + for d in results: + blocks_and_salts.update(d) # All of these blocks should have the same salt; in SDMF, it is # the file-wide IV, while in MDMF it is the per-segment salt. In @@ -874,10 +876,10 @@ class Retrieve: # d.items()[0] is like (shnum, (block, salt)) # d.items()[0][1] is like (block, salt) # d.items()[0][1][1] is the salt. - salt = d.items()[0][1][1] + salt = blocks_and_salts.items()[0][1][1] # Next, extract just the blocks from the dict. We'll use the # salt in the next step. - share_and_shareids = [(k, v[0]) for k, v in d.items()] + share_and_shareids = [(k, v[0]) for k, v in blocks_and_salts.items()] d2 = dict(share_and_shareids) shareids = [] shares = []