from foolscap.api import eventually, fireEventually
from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
DownloadStopped, MDMF_VERSION, SDMF_VERSION
-from allmydata.util import hashutil, log, mathutil
+from allmydata.util import hashutil, log, mathutil, deferredutil
from allmydata.util.dictutil import DictOfSets
from allmydata import hashtree, codec
from allmydata.storage.server import si_b2a
self._block_hash_trees = None
self._setup_encoding_parameters()
- # _decode_blocks() expects the output of a DeferredList that contains
- # the outputs of _validate_block() (each of which is a dict mapping
- # shnum to (block,salt) bytestrings).
- d = self._decode_blocks([(True, blocks_and_salts)], segnum)
+ # _decode_blocks() expects the output of a gatherResults that
+ # contains the outputs of _validate_block() (each of which is a dict
+ # mapping shnum to (block,salt) bytestrings).
+ d = self._decode_blocks([blocks_and_salts], segnum)
d.addCallback(self._decrypt_segment)
return d
dl.addCallback(self._validate_block, segnum, reader, reader.server, started)
dl.addErrback(self._validation_or_decoding_failed, [reader])
ds.append(dl)
- dl = defer.DeferredList(ds)
+ # _validation_or_decoding_failed is supposed to eat any recoverable
+ # errors (like corrupt shares), returning a None when that happens.
+ # If it raises an exception itself, or if it can't handle the error,
+ # the download should fail. So we can use gatherResults() here.
+ dl = deferredutil.gatherResults(ds)
if self._verify:
dl.addCallback(lambda ignored: "")
dl.addCallback(self._set_segment)
return dl
- def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
+ def _maybe_decode_and_decrypt_segment(self, results, segnum):
"""
- I take the results of fetching and validating the blocks from a
- callback chain in another method. If the results are such that
- they tell me that validation and fetching succeeded without
- incident, I will proceed with decoding and decryption.
- Otherwise, I will do nothing.
+ I take the results of fetching and validating the blocks from
+ _process_segment. If validation and fetching succeeded without
+ incident, I will proceed with decoding and decryption. Otherwise, I
+ will do nothing.
"""
self.log("trying to decode and decrypt segment %d" % segnum)
- failures = False
- for block_and_salt in blocks_and_salts:
- if not block_and_salt[0] or block_and_salt[1] == None:
- self.log("some validation operations failed; not proceeding")
- failures = True
- break
- if not failures:
- self.log("everything looks ok, building segment %d" % segnum)
- d = self._decode_blocks(blocks_and_salts, segnum)
- d.addCallback(self._decrypt_segment)
- d.addErrback(self._validation_or_decoding_failed,
- self._active_readers)
- # check to see whether we've been paused before writing
- # anything.
- d.addCallback(self._check_for_paused)
- d.addCallback(self._check_for_stopped)
- d.addCallback(self._set_segment)
- return d
- else:
+
+ # 'results' is the output of a gatherResults set up in
+ # _process_segment(). Each component Deferred will either contain the
+ # non-Failure output of _validate_block() for a single block (i.e.
+ # {segnum:(block,salt)}), or None if _validate_block threw an
+ # exception and _validation_or_decoding_failed handled it (by
+ # dropping that server).
+
+ if None in results:
+ self.log("some validation operations failed; not proceeding")
return defer.succeed(None)
+ self.log("everything looks ok, building segment %d" % segnum)
+ d = self._decode_blocks(results, segnum)
+ d.addCallback(self._decrypt_segment)
+ d.addErrback(self._validation_or_decoding_failed,
+ self._active_readers)
+ # check to see whether we've been paused before writing
+ # anything.
+ d.addCallback(self._check_for_paused)
+ d.addCallback(self._check_for_stopped)
+ d.addCallback(self._set_segment)
+ return d
def _set_segment(self, segment):
return dl
- def _decode_blocks(self, blocks_and_salts, segnum):
+ def _decode_blocks(self, results, segnum):
"""
I take a list of k blocks and salts, and decode that into a
single encrypted segment.
"""
- d = {}
- # We want to merge our dictionaries to the form
- # {shnum: blocks_and_salts}
- #
- # The dictionaries come from validate block that way, so we just
- # need to merge them.
- for block_and_salt in blocks_and_salts:
- d.update(block_and_salt[1])
+ # 'results' is one or more dicts (each {shnum:(block,salt)}), and we
+ # want to merge them all
+ blocks_and_salts = {}
+ for d in results:
+ blocks_and_salts.update(d)
# All of these blocks should have the same salt; in SDMF, it is
# the file-wide IV, while in MDMF it is the per-segment salt. In
# d.items()[0] is like (shnum, (block, salt))
# d.items()[0][1] is like (block, salt)
# d.items()[0][1][1] is the salt.
- salt = d.items()[0][1][1]
+ salt = blocks_and_salts.items()[0][1][1]
# Next, extract just the blocks from the dict. We'll use the
# salt in the next step.
- share_and_shareids = [(k, v[0]) for k, v in d.items()]
+ share_and_shareids = [(k, v[0]) for k, v in blocks_and_salts.items()]
d2 = dict(share_and_shareids)
shareids = []
shares = []