-import struct, time
+import time
from itertools import count
from zope.interface import implements
from twisted.internet import defer
from twisted.python import failure
-from foolscap.api import DeadReferenceError, eventually, fireEventually
-from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError
-from allmydata.util import hashutil, idlib, log
+from twisted.internet.interfaces import IPushProducer, IConsumer
+from foolscap.api import eventually, fireEventually
+from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
+ MDMF_VERSION, SDMF_VERSION
+from allmydata.util import hashutil, log, mathutil
from allmydata.util.dictutil import DictOfSets
from allmydata import hashtree, codec
from allmydata.storage.server import si_b2a
from pycryptopp.publickey import rsa
from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
-from allmydata.mutable.layout import SIGNED_PREFIX, unpack_share_data
+from allmydata.mutable.layout import MDMFSlotReadProxy
class RetrieveStatus:
implements(IRetrieveStatus)
# times, and each will have a separate response chain. However the
# Retrieve object will remain tied to a specific version of the file, and
# will use a single ServerMap instance.
+ implements(IPushProducer)
- def __init__(self, filenode, servermap, verinfo, fetch_privkey=False):
+ def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
+ verify=False):
self._node = filenode
assert self._node.get_pubkey()
self._storage_index = filenode.get_storage_index()
self.verinfo = verinfo
# during repair, we may be called upon to grab the private key, since
# it wasn't picked up during a verify=False checker run, and we'll
- # need it for repair to generate the a new version.
- self._need_privkey = fetch_privkey
- if self._node.get_privkey():
+ # need it for repair to generate a new version.
+ self._need_privkey = fetch_privkey or verify
+ if self._node.get_privkey() and not verify:
self._need_privkey = False
+ if self._need_privkey:
+ # TODO: Evaluate the need for this. We'll use it if we want
+ # to limit how many queries are on the wire for the privkey
+ # at once.
+ self._privkey_query_markers = [] # one Marker for each time we've
+ # tried to get the privkey.
+
+ # verify means that we are using the downloader logic to verify all
+ # of our shares. This tells the downloader a few things.
+ #
+ # 1. We need to download all of the shares.
+ # 2. We don't need to decode or decrypt the shares, since our
+ # caller doesn't care about the plaintext, only the
+ # information about which shares are or are not valid.
+ # 3. When we are validating readers, we need to validate the
+ # signature on the prefix. Do we? We already do this in the
+ # servermap update?
+ self._verify = False
+ if verify:
+ self._verify = True
+
self._status = RetrieveStatus()
self._status.set_storage_index(self._storage_index)
self._status.set_helper(False)
offsets_tuple) = self.verinfo
self._status.set_size(datalength)
self._status.set_encoding(k, N)
+ self.readers = {}
+ self._paused = False
+ self._paused_deferred = None
+ self._offset = None
+ self._read_length = None
+ self.log("got seqnum %d" % self.verinfo[0])
+
def get_status(self):
return self._status
kwargs["facility"] = "tahoe.mutable.retrieve"
return log.msg(*args, **kwargs)
- def download(self):
+
+ ###################
+ # IPushProducer
+
+ def pauseProducing(self):
+ """
+ I am called by my download target if we have produced too much
+ data for it to handle. I make the downloader stop producing new
+ data until my resumeProducing method is called.
+ """
+ if self._paused:
+ return
+
+ # fired when the download is unpaused.
+ self._old_status = self._status.get_status()
+ self._status.set_status("Paused")
+
+ self._pause_deferred = defer.Deferred()
+ self._paused = True
+
+
+ def resumeProducing(self):
+ """
+ I am called by my download target once it is ready to begin
+ receiving data again.
+ """
+ if not self._paused:
+ return
+
+ self._paused = False
+ p = self._pause_deferred
+ self._pause_deferred = None
+ self._status.set_status(self._old_status)
+
+ eventually(p.callback, None)
+
+
+ def _check_for_paused(self, res):
+ """
+ I am called just before a write to the consumer. I return a
+ Deferred that eventually fires with the data that is to be
+ written to the consumer. If the download has not been paused,
+ the Deferred fires immediately. Otherwise, the Deferred fires
+ when the downloader is unpaused.
+ """
+ if self._paused:
+ d = defer.Deferred()
+ self._pause_deferred.addCallback(lambda ignored: d.callback(res))
+ return d
+ return defer.succeed(res)
+
+
+ def download(self, consumer=None, offset=0, size=None):
+ assert IConsumer.providedBy(consumer) or self._verify
+
+ if consumer:
+ self._consumer = consumer
+ # we provide IPushProducer, so streaming=True, per
+ # IConsumer.
+ self._consumer.registerProducer(self, streaming=True)
+
self._done_deferred = defer.Deferred()
self._started = time.time()
self._status.set_status("Retrieving Shares")
+ self._offset = offset
+ self._read_length = size
+
# first, which servers can we use?
versionmap = self.servermap.make_versionmap()
shares = versionmap[self.verinfo]
self.remaining_sharemap = DictOfSets()
for (shnum, peerid, timestamp) in shares:
self.remaining_sharemap.add(shnum, peerid)
+ # If the servermap update fetched anything, it fetched at least 1
+ # KiB, so we ask for that much.
+ # TODO: Change the cache methods to allow us to fetch all of the
+ # data that they have, then change this method to do that.
+ any_cache = self._node._read_from_cache(self.verinfo, shnum,
+ 0, 1000)
+ ss = self.servermap.connections[peerid]
+ reader = MDMFSlotReadProxy(ss,
+ self._storage_index,
+ shnum,
+ any_cache)
+ reader.peerid = peerid
+ self.readers[shnum] = reader
+
self.shares = {} # maps shnum to validated blocks
+ self._active_readers = [] # list of active readers for this dl.
+ self._validated_readers = set() # set of readers that we have
+ # validated the prefix of
+ self._block_hash_trees = {} # shnum => hashtree
# how many shares do we need?
- (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ (seqnum,
+ root_hash,
+ IV,
+ segsize,
+ datalength,
+ k,
+ N,
+ prefix,
offsets_tuple) = self.verinfo
+
+
+ # We need one share hash tree for the entire file; its leaves
+ # are the roots of the block hash trees for the shares that
+ # comprise it, and its root is in the verinfo.
+ self.share_hash_tree = hashtree.IncompleteHashTree(N)
+ self.share_hash_tree.set_hashes({0: root_hash})
+
+ # This will set up both the segment decoder and the tail segment
+ # decoder, as well as a variety of other instance variables that
+ # the download process will use.
+ self._setup_encoding_parameters()
assert len(self.remaining_sharemap) >= k
- # we start with the lowest shnums we have available, since FEC is
- # faster if we're using "primary shares"
- self.active_shnums = set(sorted(self.remaining_sharemap.keys())[:k])
- for shnum in self.active_shnums:
- # we use an arbitrary peer who has the share. If shares are
- # doubled up (more than one share per peer), we could make this
- # run faster by spreading the load among multiple peers. But the
- # algorithm to do that is more complicated than I want to write
- # right now, and a well-provisioned grid shouldn't have multiple
- # shares per peer.
- peerid = list(self.remaining_sharemap[shnum])[0]
- self.get_data(shnum, peerid)
-
- # control flow beyond this point: state machine. Receiving responses
- # from queries is the input. We might send out more queries, or we
- # might produce a result.
+ self.log("starting download")
+ self._paused = False
+ self._started_fetching = time.time()
+
+ self._add_active_peers()
+ # The download process beyond this is a state machine.
+ # _add_active_peers will select the peers that we want to use
+ # for the download, and then attempt to start downloading. After
+ # each segment, it will check for doneness, reacting to broken
+ # peers and corrupt shares as necessary. If it runs out of good
+ # peers before downloading all of the segments, _done_deferred
+ # will errback. Otherwise, it will eventually callback with the
+ # contents of the mutable file.
return self._done_deferred
- def get_data(self, shnum, peerid):
- self.log(format="sending sh#%(shnum)d request to [%(peerid)s]",
- shnum=shnum,
- peerid=idlib.shortnodeid_b2a(peerid),
- level=log.NOISY)
- ss = self.servermap.connections[peerid]
- started = time.time()
- (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+
+ def decode(self, blocks_and_salts, segnum):
+ """
+ I am a helper method that the mutable file update process uses
+ as a shortcut to decode and decrypt the segments that it needs
+ to fetch in order to perform a file update. I take in a
+ collection of blocks and salts, and pick some of those to make a
+ segment with. I return the plaintext associated with that
+ segment.
+ """
+ # shnum => block hash tree. Unusued, but setup_encoding_parameters will
+ # want to set this.
+ # XXX: Make it so that it won't set this if we're just decoding.
+ self._block_hash_trees = {}
+ self._setup_encoding_parameters()
+ # This is the form expected by decode.
+ blocks_and_salts = blocks_and_salts.items()
+ blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
+
+ d = self._decode_blocks(blocks_and_salts, segnum)
+ d.addCallback(self._decrypt_segment)
+ return d
+
+
+ def _setup_encoding_parameters(self):
+ """
+ I set up the encoding parameters, including k, n, the number
+ of segments associated with this file, and the segment decoder.
+ """
+ (seqnum,
+ root_hash,
+ IV,
+ segsize,
+ datalength,
+ k,
+ n,
+ known_prefix,
offsets_tuple) = self.verinfo
- offsets = dict(offsets_tuple)
+ self._required_shares = k
+ self._total_shares = n
+ self._segment_size = segsize
+ self._data_length = datalength
- # we read the checkstring, to make sure that the data we grab is from
- # the right version.
- readv = [ (0, struct.calcsize(SIGNED_PREFIX)) ]
+ if not IV:
+ self._version = MDMF_VERSION
+ else:
+ self._version = SDMF_VERSION
- # We also read the data, and the hashes necessary to validate them
- # (share_hash_chain, block_hash_tree, share_data). We don't read the
- # signature or the pubkey, since that was handled during the
- # servermap phase, and we'll be comparing the share hash chain
- # against the roothash that was validated back then.
+ if datalength and segsize:
+ self._num_segments = mathutil.div_ceil(datalength, segsize)
+ self._tail_data_size = datalength % segsize
+ else:
+ self._num_segments = 0
+ self._tail_data_size = 0
- readv.append( (offsets['share_hash_chain'],
- offsets['enc_privkey'] - offsets['share_hash_chain'] ) )
+ self._segment_decoder = codec.CRSDecoder()
+ self._segment_decoder.set_params(segsize, k, n)
- # if we need the private key (for repair), we also fetch that
- if self._need_privkey:
- readv.append( (offsets['enc_privkey'],
- offsets['EOF'] - offsets['enc_privkey']) )
-
- m = Marker()
- self._outstanding_queries[m] = (peerid, shnum, started)
-
- # ask the cache first
- got_from_cache = False
- datavs = []
- for (offset, length) in readv:
- data = self._node._read_from_cache(self.verinfo, shnum, offset, length)
- if data is not None:
- datavs.append(data)
- if len(datavs) == len(readv):
- self.log("got data from cache")
- got_from_cache = True
- d = fireEventually({shnum: datavs})
- # datavs is a dict mapping shnum to a pair of strings
+ if not self._tail_data_size:
+ self._tail_data_size = segsize
+
+ self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
+ self._required_shares)
+ if self._tail_segment_size == self._segment_size:
+ self._tail_decoder = self._segment_decoder
else:
- d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
- self.remaining_sharemap.discard(shnum, peerid)
-
- d.addCallback(self._got_results, m, peerid, started, got_from_cache)
- d.addErrback(self._query_failed, m, peerid)
- # errors that aren't handled by _query_failed (and errors caused by
- # _query_failed) get logged, but we still want to check for doneness.
- def _oops(f):
- self.log(format="problem in _query_failed for sh#%(shnum)d to %(peerid)s",
- shnum=shnum,
- peerid=idlib.shortnodeid_b2a(peerid),
- failure=f,
- level=log.WEIRD, umid="W0xnQA")
- d.addErrback(_oops)
- d.addBoth(self._check_for_done)
- # any error during _check_for_done means the download fails. If the
- # download is successful, _check_for_done will fire _done by itself.
- d.addErrback(self._done)
- d.addErrback(log.err)
- return d # purely for testing convenience
-
- def _do_read(self, ss, peerid, storage_index, shnums, readv):
- # isolate the callRemote to a separate method, so tests can subclass
- # Publish and override it
- d = ss.callRemote("slot_readv", storage_index, shnums, readv)
- return d
+ self._tail_decoder = codec.CRSDecoder()
+ self._tail_decoder.set_params(self._tail_segment_size,
+ self._required_shares,
+ self._total_shares)
- def remove_peer(self, peerid):
- for shnum in list(self.remaining_sharemap.keys()):
- self.remaining_sharemap.discard(shnum, peerid)
+ self.log("got encoding parameters: "
+ "k: %d "
+ "n: %d "
+ "%d segments of %d bytes each (%d byte tail segment)" % \
+ (k, n, self._num_segments, self._segment_size,
+ self._tail_segment_size))
- def _got_results(self, datavs, marker, peerid, started, got_from_cache):
- now = time.time()
- elapsed = now - started
- if not got_from_cache:
- self._status.add_fetch_timing(peerid, elapsed)
- self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
- shares=len(datavs),
- peerid=idlib.shortnodeid_b2a(peerid),
- level=log.NOISY)
- self._outstanding_queries.pop(marker, None)
- if not self._running:
- return
+ for i in xrange(self._total_shares):
+ # So we don't have to do this later.
+ self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
- # note that we only ask for a single share per query, so we only
- # expect a single share back. On the other hand, we use the extra
- # shares if we get them.. seems better than an assert().
+ # Our last task is to tell the downloader where to start and
+ # where to stop. We use three parameters for that:
+ # - self._start_segment: the segment that we need to start
+ # downloading from.
+ # - self._current_segment: the next segment that we need to
+ # download.
+ # - self._last_segment: The last segment that we were asked to
+ # download.
+ #
+ # We say that the download is complete when
+ # self._current_segment > self._last_segment. We use
+ # self._start_segment and self._last_segment to know when to
+ # strip things off of segments, and how much to strip.
+ if self._offset:
+ self.log("got offset: %d" % self._offset)
+ # our start segment is the first segment containing the
+ # offset we were given.
+ start = mathutil.div_ceil(self._offset,
+ self._segment_size)
+ # this gets us the first segment after self._offset. Then
+ # our start segment is the one before it.
+ start -= 1
- for shnum,datav in datavs.items():
- (prefix, hash_and_data) = datav[:2]
- try:
- self._got_results_one_share(shnum, peerid,
- prefix, hash_and_data)
- except CorruptShareError, e:
- # log it and give the other shares a chance to be processed
- f = failure.Failure()
- self.log(format="bad share: %(f_value)s",
- f_value=str(f.value), failure=f,
- level=log.WEIRD, umid="7fzWZw")
- self.notify_server_corruption(peerid, shnum, str(e))
- self.remove_peer(peerid)
- self.servermap.mark_bad_share(peerid, shnum, prefix)
- self._bad_shares.add( (peerid, shnum) )
- self._status.problems[peerid] = f
- self._last_failure = f
- pass
- if self._need_privkey and len(datav) > 2:
- lp = None
- self._try_to_validate_privkey(datav[2], peerid, shnum, lp)
- # all done!
+ assert start < self._num_segments
+ self._start_segment = start
+ self.log("got start segment: %d" % self._start_segment)
+ else:
+ self._start_segment = 0
- def notify_server_corruption(self, peerid, shnum, reason):
- ss = self.servermap.connections[peerid]
- ss.callRemoteOnly("advise_corrupt_share",
- "mutable", self._storage_index, shnum, reason)
- def _got_results_one_share(self, shnum, peerid,
- got_prefix, got_hash_and_data):
- self.log("_got_results: got shnum #%d from peerid %s"
- % (shnum, idlib.shortnodeid_b2a(peerid)))
- (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ if self._read_length:
+ # our end segment is the last segment containing part of the
+ # segment that we were asked to read.
+ self.log("got read length %d" % self._read_length)
+ end_data = self._offset + self._read_length
+ end = mathutil.div_ceil(end_data,
+ self._segment_size)
+ end -= 1
+ assert end < self._num_segments
+ self._last_segment = end
+ self.log("got end segment: %d" % self._last_segment)
+ else:
+ self._last_segment = self._num_segments - 1
+
+ self._current_segment = self._start_segment
+
+ def _add_active_peers(self):
+ """
+ I populate self._active_readers with enough active readers to
+ retrieve the contents of this mutable file. I am called before
+ downloading starts, and (eventually) after each validation
+ error, connection error, or other problem in the download.
+ """
+ # TODO: It would be cool to investigate other heuristics for
+ # reader selection. For instance, the cost (in time the user
+ # spends waiting for their file) of selecting a really slow peer
+ # that happens to have a primary share is probably more than
+ # selecting a really fast peer that doesn't have a primary
+ # share. Maybe the servermap could be extended to provide this
+ # information; it could keep track of latency information while
+ # it gathers more important data, and then this routine could
+ # use that to select active readers.
+ #
+ # (these and other questions would be easier to answer with a
+ # robust, configurable tahoe-lafs simulator, which modeled node
+ # failures, differences in node speed, and other characteristics
+ # that we expect storage servers to have. You could have
+ # presets for really stable grids (like allmydata.com),
+ # friendnets, make it easy to configure your own settings, and
+ # then simulate the effect of big changes on these use cases
+ # instead of just reasoning about what the effect might be. Out
+ # of scope for MDMF, though.)
+
+ # We need at least self._required_shares readers to download a
+ # segment.
+ if self._verify:
+ needed = self._total_shares
+ else:
+ needed = self._required_shares - len(self._active_readers)
+ # XXX: Why don't format= log messages work here?
+ self.log("adding %d peers to the active peers list" % needed)
+
+ # We favor lower numbered shares, since FEC is faster with
+ # primary shares than with other shares, and lower-numbered
+ # shares are more likely to be primary than higher numbered
+ # shares.
+ active_shnums = set(sorted(self.remaining_sharemap.keys()))
+ # We shouldn't consider adding shares that we already have; this
+ # will cause problems later.
+ active_shnums -= set([reader.shnum for reader in self._active_readers])
+ active_shnums = list(active_shnums)[:needed]
+ if len(active_shnums) < needed and not self._verify:
+ # We don't have enough readers to retrieve the file; fail.
+ return self._failed()
+
+ for shnum in active_shnums:
+ self._active_readers.append(self.readers[shnum])
+ self.log("added reader for share %d" % shnum)
+ assert len(self._active_readers) >= self._required_shares
+ # Conceptually, this is part of the _add_active_peers step. It
+ # validates the prefixes of newly added readers to make sure
+ # that they match what we are expecting for self.verinfo. If
+ # validation is successful, _validate_active_prefixes will call
+ # _download_current_segment for us. If validation is
+ # unsuccessful, then _validate_prefixes will remove the peer and
+ # call _add_active_peers again, where we will attempt to rectify
+ # the problem by choosing another peer.
+ return self._validate_active_prefixes()
+
+
+ def _validate_active_prefixes(self):
+ """
+ I check to make sure that the prefixes on the peers that I am
+ currently reading from match the prefix that we want to see, as
+ said in self.verinfo.
+
+ If I find that all of the active peers have acceptable prefixes,
+ I pass control to _download_current_segment, which will use
+ those peers to do cool things. If I find that some of the active
+ peers have unacceptable prefixes, I will remove them from active
+ peers (and from further consideration) and call
+ _add_active_peers to attempt to rectify the situation. I keep
+ track of which peers I have already validated so that I don't
+ need to do so again.
+ """
+ assert self._active_readers, "No more active readers"
+
+ ds = []
+ new_readers = set(self._active_readers) - self._validated_readers
+ self.log('validating %d newly-added active readers' % len(new_readers))
+
+ for reader in new_readers:
+ # We force a remote read here -- otherwise, we are relying
+ # on cached data that we already verified as valid, and we
+ # won't detect an uncoordinated write that has occurred
+ # since the last servermap update.
+ d = reader.get_prefix(force_remote=True)
+ d.addCallback(self._try_to_validate_prefix, reader)
+ ds.append(d)
+ dl = defer.DeferredList(ds, consumeErrors=True)
+ def _check_results(results):
+ # Each result in results will be of the form (success, msg).
+ # We don't care about msg, but success will tell us whether
+ # or not the checkstring validated. If it didn't, we need to
+ # remove the offending (peer,share) from our active readers,
+ # and ensure that active readers is again populated.
+ bad_readers = []
+ for i, result in enumerate(results):
+ if not result[0]:
+ reader = self._active_readers[i]
+ f = result[1]
+ assert isinstance(f, failure.Failure)
+
+ self.log("The reader %s failed to "
+ "properly validate: %s" % \
+ (reader, str(f.value)))
+ bad_readers.append((reader, f))
+ else:
+ reader = self._active_readers[i]
+ self.log("the reader %s checks out, so we'll use it" % \
+ reader)
+ self._validated_readers.add(reader)
+ # Each time we validate a reader, we check to see if
+ # we need the private key. If we do, we politely ask
+ # for it and then continue computing. If we find
+ # that we haven't gotten it at the end of
+ # segment decoding, then we'll take more drastic
+ # measures.
+ if self._need_privkey and not self._node.is_readonly():
+ d = reader.get_encprivkey()
+ d.addCallback(self._try_to_validate_privkey, reader)
+ if bad_readers:
+ # We do them all at once, or else we screw up list indexing.
+ for (reader, f) in bad_readers:
+ self._mark_bad_share(reader, f)
+ if self._verify:
+ if len(self._active_readers) >= self._required_shares:
+ return self._download_current_segment()
+ else:
+ return self._failed()
+ else:
+ return self._add_active_peers()
+ else:
+ return self._download_current_segment()
+ # The next step will assert that it has enough active
+ # readers to fetch shares; we just need to remove it.
+ dl.addCallback(_check_results)
+ return dl
+
+
+ def _try_to_validate_prefix(self, prefix, reader):
+ """
+ I check that the prefix returned by a candidate server for
+ retrieval matches the prefix that the servermap knows about
+ (and, hence, the prefix that was validated earlier). If it does,
+ I return True, which means that I approve of the use of the
+ candidate server for segment retrieval. If it doesn't, I return
+ False, which means that another server must be chosen.
+ """
+ (seqnum,
+ root_hash,
+ IV,
+ segsize,
+ datalength,
+ k,
+ N,
+ known_prefix,
offsets_tuple) = self.verinfo
- assert len(got_prefix) == len(prefix), (len(got_prefix), len(prefix))
- if got_prefix != prefix:
- msg = "someone wrote to the data since we read the servermap: prefix changed"
- raise UncoordinatedWriteError(msg)
- (share_hash_chain, block_hash_tree,
- share_data) = unpack_share_data(self.verinfo, got_hash_and_data)
-
- assert isinstance(share_data, str)
- # build the block hash tree. SDMF has only one leaf.
- leaves = [hashutil.block_hash(share_data)]
- t = hashtree.HashTree(leaves)
- if list(t) != block_hash_tree:
- raise CorruptShareError(peerid, shnum, "block hash tree failure")
- share_hash_leaf = t[0]
- t2 = hashtree.IncompleteHashTree(N)
- # root_hash was checked by the signature
- t2.set_hashes({0: root_hash})
- try:
- t2.set_hashes(hashes=share_hash_chain,
- leaves={shnum: share_hash_leaf})
- except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
- IndexError), e:
- msg = "corrupt hashes: %s" % (e,)
- raise CorruptShareError(peerid, shnum, msg)
- self.log(" data valid! len=%d" % len(share_data))
- # each query comes down to this: placing validated share data into
- # self.shares
- self.shares[shnum] = share_data
+ if known_prefix != prefix:
+ self.log("prefix from share %d doesn't match" % reader.shnum)
+ raise UncoordinatedWriteError("Mismatched prefix -- this could "
+ "indicate an uncoordinated write")
+ # Otherwise, we're okay -- no issues.
- def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
- alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
- alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
- if alleged_writekey != self._node.get_writekey():
- self.log("invalid privkey from %s shnum %d" %
- (idlib.nodeid_b2a(peerid)[:8], shnum),
- parent=lp, level=log.WEIRD, umid="YIw4tA")
- return
+ def _remove_reader(self, reader):
+ """
+ At various points, we will wish to remove a peer from
+ consideration and/or use. These include, but are not necessarily
+ limited to:
- # it's good
- self.log("got valid privkey from shnum %d on peerid %s" %
- (shnum, idlib.shortnodeid_b2a(peerid)),
- parent=lp)
- privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
- self._node._populate_encprivkey(enc_privkey)
- self._node._populate_privkey(privkey)
- self._need_privkey = False
+ - A connection error.
+ - A mismatched prefix (that is, a prefix that does not match
+ our conception of the version information string).
+ - A failing block hash, salt hash, or share hash, which can
+ indicate disk failure/bit flips, or network trouble.
- def _query_failed(self, f, marker, peerid):
- self.log(format="query to [%(peerid)s] failed",
- peerid=idlib.shortnodeid_b2a(peerid),
- level=log.NOISY)
- self._status.problems[peerid] = f
- self._outstanding_queries.pop(marker, None)
- if not self._running:
- return
+ This method will do that. I will make sure that the
+ (shnum,reader) combination represented by my reader argument is
+ not used for anything else during this download. I will not
+ advise the reader of any corruption, something that my callers
+ may wish to do on their own.
+ """
+ # TODO: When you're done writing this, see if this is ever
+ # actually used for something that _mark_bad_share isn't. I have
+ # a feeling that they will be used for very similar things, and
+ # that having them both here is just going to be an epic amount
+ # of code duplication.
+ #
+ # (well, okay, not epic, but meaningful)
+ self.log("removing reader %s" % reader)
+ # Remove the reader from _active_readers
+ self._active_readers.remove(reader)
+ # TODO: self.readers.remove(reader)?
+ for shnum in list(self.remaining_sharemap.keys()):
+ self.remaining_sharemap.discard(shnum, reader.peerid)
+
+
+ def _mark_bad_share(self, reader, f):
+ """
+ I mark the (peerid, shnum) encapsulated by my reader argument as
+ a bad share, which means that it will not be used anywhere else.
+
+ There are several reasons to want to mark something as a bad
+ share. These include:
+
+ - A connection error to the peer.
+ - A mismatched prefix (that is, a prefix that does not match
+ our local conception of the version information string).
+ - A failing block hash, salt hash, share hash, or other
+ integrity check.
+
+ This method will ensure that readers that we wish to mark bad
+ (for these reasons or other reasons) are not used for the rest
+ of the download. Additionally, it will attempt to tell the
+ remote peer (with no guarantee of success) that its share is
+ corrupt.
+ """
+ self.log("marking share %d on server %s as bad" % \
+ (reader.shnum, reader))
+ prefix = self.verinfo[-2]
+ self.servermap.mark_bad_share(reader.peerid,
+ reader.shnum,
+ prefix)
+ self._remove_reader(reader)
+ self._bad_shares.add((reader.peerid, reader.shnum, f))
+ self._status.problems[reader.peerid] = f
self._last_failure = f
- self.remove_peer(peerid)
- level = log.WEIRD
- if f.check(DeadReferenceError):
- level = log.UNUSUAL
- self.log(format="error during query: %(f_value)s",
- f_value=str(f.value), failure=f, level=level, umid="gOJB5g")
+ self.notify_server_corruption(reader.peerid, reader.shnum,
+ str(f.value))
- def _check_for_done(self, res):
- # exit paths:
- # return : keep waiting, no new queries
- # return self._send_more_queries(outstanding) : send some more queries
- # fire self._done(plaintext) : download successful
- # raise exception : download fails
-
- self.log(format="_check_for_done: running=%(running)s, decoding=%(decoding)s",
- running=self._running, decoding=self._decoding,
- level=log.NOISY)
- if not self._running:
- return
- if self._decoding:
- return
- (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
- offsets_tuple) = self.verinfo
- if len(self.shares) < k:
- # we don't have enough shares yet
- return self._maybe_send_more_queries(k)
- if self._need_privkey:
- # we got k shares, but none of them had a valid privkey. TODO:
- # look further. Adding code to do this is a bit complicated, and
- # I want to avoid that complication, and this should be pretty
- # rare (k shares with bitflips in the enc_privkey but not in the
- # data blocks). If we actually do get here, the subsequent repair
- # will fail for lack of a privkey.
- self.log("got k shares but still need_privkey, bummer",
- level=log.WEIRD, umid="MdRHPA")
-
- # we have enough to finish. All the shares have had their hashes
- # checked, so if something fails at this point, we don't know how
- # to fix it, so the download will fail.
-
- self._decoding = True # avoid reentrancy
- self._status.set_status("decoding")
- now = time.time()
- elapsed = now - self._started
- self._status.timings["fetch"] = elapsed
-
- d = defer.maybeDeferred(self._decode)
- d.addCallback(self._decrypt, IV, self._node.get_readkey())
- d.addBoth(self._done)
- return d # purely for test convenience
-
- def _maybe_send_more_queries(self, k):
- # we don't have enough shares yet. Should we send out more queries?
- # There are some number of queries outstanding, each for a single
- # share. If we can generate 'needed_shares' additional queries, we do
- # so. If we can't, then we know this file is a goner, and we raise
- # NotEnoughSharesError.
- self.log(format=("_maybe_send_more_queries, have=%(have)d, k=%(k)d, "
- "outstanding=%(outstanding)d"),
- have=len(self.shares), k=k,
- outstanding=len(self._outstanding_queries),
- level=log.NOISY)
-
- remaining_shares = k - len(self.shares)
- needed = remaining_shares - len(self._outstanding_queries)
- if not needed:
- # we have enough queries in flight already
-
- # TODO: but if they've been in flight for a long time, and we
- # have reason to believe that new queries might respond faster
- # (i.e. we've seen other queries come back faster, then consider
- # sending out new queries. This could help with peers which have
- # silently gone away since the servermap was updated, for which
- # we're still waiting for the 15-minute TCP disconnect to happen.
- self.log("enough queries are in flight, no more are needed",
- level=log.NOISY)
- return
+ def _download_current_segment(self):
+ """
+ I download, validate, decode, decrypt, and assemble the segment
+ that this Retrieve is currently responsible for downloading.
+ """
+ assert len(self._active_readers) >= self._required_shares
+ if self._current_segment <= self._last_segment:
+ d = self._process_segment(self._current_segment)
+ else:
+ d = defer.succeed(None)
+ d.addBoth(self._turn_barrier)
+ d.addCallback(self._check_for_done)
+ return d
+
+
+ def _turn_barrier(self, result):
+ """
+ I help the download process avoid the recursion limit issues
+ discussed in #237.
+ """
+ return fireEventually(result)
- outstanding_shnums = set([shnum
- for (peerid, shnum, started)
- in self._outstanding_queries.values()])
- # prefer low-numbered shares, they are more likely to be primary
- available_shnums = sorted(self.remaining_sharemap.keys())
- for shnum in available_shnums:
- if shnum in outstanding_shnums:
- # skip ones that are already in transit
- continue
- if shnum not in self.remaining_sharemap:
- # no servers for that shnum. note that DictOfSets removes
- # empty sets from the dict for us.
- continue
- peerid = list(self.remaining_sharemap[shnum])[0]
- # get_data will remove that peerid from the sharemap, and add the
- # query to self._outstanding_queries
- self._status.set_status("Retrieving More Shares")
- self.get_data(shnum, peerid)
- needed -= 1
- if not needed:
+
+ def _process_segment(self, segnum):
+ """
+ I download, validate, decode, and decrypt one segment of the
+ file that this Retrieve is retrieving. This means coordinating
+ the process of getting k blocks of that file, validating them,
+ assembling them into one segment with the decoder, and then
+ decrypting them.
+ """
+ self.log("processing segment %d" % segnum)
+
+ # TODO: The old code uses a marker. Should this code do that
+ # too? What did the Marker do?
+ assert len(self._active_readers) >= self._required_shares
+
+ # We need to ask each of our active readers for its block and
+ # salt. We will then validate those. If validation is
+ # successful, we will assemble the results into plaintext.
+ ds = []
+ for reader in self._active_readers:
+ started = time.time()
+ d = reader.get_block_and_salt(segnum, queue=True)
+ d2 = self._get_needed_hashes(reader, segnum)
+ dl = defer.DeferredList([d, d2], consumeErrors=True)
+ dl.addCallback(self._validate_block, segnum, reader, started)
+ dl.addErrback(self._validation_or_decoding_failed, [reader])
+ ds.append(dl)
+ reader.flush()
+ dl = defer.DeferredList(ds)
+ if self._verify:
+ dl.addCallback(lambda ignored: "")
+ dl.addCallback(self._set_segment)
+ else:
+ dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
+ return dl
+
+
+ def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
+ """
+ I take the results of fetching and validating the blocks from a
+ callback chain in another method. If the results are such that
+ they tell me that validation and fetching succeeded without
+ incident, I will proceed with decoding and decryption.
+ Otherwise, I will do nothing.
+ """
+ self.log("trying to decode and decrypt segment %d" % segnum)
+ failures = False
+ for block_and_salt in blocks_and_salts:
+ if not block_and_salt[0] or block_and_salt[1] == None:
+ self.log("some validation operations failed; not proceeding")
+ failures = True
break
+ if not failures:
+ self.log("everything looks ok, building segment %d" % segnum)
+ d = self._decode_blocks(blocks_and_salts, segnum)
+ d.addCallback(self._decrypt_segment)
+ d.addErrback(self._validation_or_decoding_failed,
+ self._active_readers)
+ # check to see whether we've been paused before writing
+ # anything.
+ d.addCallback(self._check_for_paused)
+ d.addCallback(self._set_segment)
+ return d
+ else:
+ return defer.succeed(None)
- # at this point, we have as many outstanding queries as we can. If
- # needed!=0 then we might not have enough to recover the file.
- if needed:
- format = ("ran out of peers: "
- "have %(have)d shares (k=%(k)d), "
- "%(outstanding)d queries in flight, "
- "need %(need)d more, "
- "found %(bad)d bad shares")
- args = {"have": len(self.shares),
- "k": k,
- "outstanding": len(self._outstanding_queries),
- "need": needed,
- "bad": len(self._bad_shares),
- }
- self.log(format=format,
- level=log.WEIRD, umid="ezTfjw", **args)
- err = NotEnoughSharesError("%s, last failure: %s" %
- (format % args, self._last_failure))
- if self._bad_shares:
- self.log("We found some bad shares this pass. You should "
- "update the servermap and try again to check "
- "more peers",
- level=log.WEIRD, umid="EFkOlA")
- err.servermap = self.servermap
- raise err
+ def _set_segment(self, segment):
+ """
+ Given a plaintext segment, I register that segment with the
+ target that is handling the file download.
+ """
+ self.log("got plaintext for segment %d" % self._current_segment)
+ if self._current_segment == self._start_segment:
+ # We're on the first segment. It's possible that we want
+ # only some part of the end of this segment, and that we
+ # just downloaded the whole thing to get that part. If so,
+ # we need to account for that and give the reader just the
+ # data that they want.
+ n = self._offset % self._segment_size
+ self.log("stripping %d bytes off of the first segment" % n)
+ self.log("original segment length: %d" % len(segment))
+ segment = segment[n:]
+ self.log("new segment length: %d" % len(segment))
+
+ if self._current_segment == self._last_segment and self._read_length is not None:
+ # We're on the last segment. It's possible that we only want
+ # part of the beginning of this segment, and that we
+ # downloaded the whole thing anyway. Make sure to give the
+ # caller only the portion of the segment that they want to
+ # receive.
+ extra = self._read_length
+ if self._start_segment != self._last_segment:
+ extra -= self._segment_size - \
+ (self._offset % self._segment_size)
+ extra %= self._segment_size
+ self.log("original segment length: %d" % len(segment))
+ segment = segment[:extra]
+ self.log("new segment length: %d" % len(segment))
+ self.log("only taking %d bytes of the last segment" % extra)
+
+ if not self._verify:
+ self._consumer.write(segment)
+ else:
+ # we don't care about the plaintext if we are doing a verify.
+ segment = None
+ self._current_segment += 1
+
+
+ def _validation_or_decoding_failed(self, f, readers):
+ """
+ I am called when a block or a salt fails to correctly validate, or when
+ the decryption or decoding operation fails for some reason. I react to
+ this failure by notifying the remote server of corruption, and then
+ removing the remote peer from further activity.
+ """
+ assert isinstance(readers, list)
+ bad_shnums = [reader.shnum for reader in readers]
+
+ self.log("validation or decoding failed on share(s) %s, peer(s) %s "
+ ", segment %d: %s" % \
+ (bad_shnums, readers, self._current_segment, str(f)))
+ for reader in readers:
+ self._mark_bad_share(reader, f)
return
- def _decode(self):
- started = time.time()
- (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
- offsets_tuple) = self.verinfo
- # shares_dict is a dict mapping shnum to share data, but the codec
- # wants two lists.
- shareids = []; shares = []
- for shareid, share in self.shares.items():
+ def _validate_block(self, results, segnum, reader, started):
+ """
+ I validate a block from one share on a remote server.
+ """
+ # Grab the part of the block hash tree that is necessary to
+ # validate this block, then generate the block hash root.
+ self.log("validating share %d for segment %d" % (reader.shnum,
+ segnum))
+ self._status.add_fetch_timing(reader.peerid, started)
+ self._status.set_status("Valdiating blocks for segment %d" % segnum)
+ # Did we fail to fetch either of the things that we were
+ # supposed to? Fail if so.
+ if not results[0][0] and results[1][0]:
+ # handled by the errback handler.
+
+ # These all get batched into one query, so the resulting
+ # failure should be the same for all of them, so we can just
+ # use the first one.
+ assert isinstance(results[0][1], failure.Failure)
+
+ f = results[0][1]
+ raise CorruptShareError(reader.peerid,
+ reader.shnum,
+ "Connection error: %s" % str(f))
+
+ block_and_salt, block_and_sharehashes = results
+ block, salt = block_and_salt[1]
+ blockhashes, sharehashes = block_and_sharehashes[1]
+
+ blockhashes = dict(enumerate(blockhashes[1]))
+ self.log("the reader gave me the following blockhashes: %s" % \
+ blockhashes.keys())
+ self.log("the reader gave me the following sharehashes: %s" % \
+ sharehashes[1].keys())
+ bht = self._block_hash_trees[reader.shnum]
+
+ if bht.needed_hashes(segnum, include_leaf=True):
+ try:
+ bht.set_hashes(blockhashes)
+ except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
+ IndexError), e:
+ raise CorruptShareError(reader.peerid,
+ reader.shnum,
+ "block hash tree failure: %s" % e)
+
+ if self._version == MDMF_VERSION:
+ blockhash = hashutil.block_hash(salt + block)
+ else:
+ blockhash = hashutil.block_hash(block)
+ # If this works without an error, then validation is
+ # successful.
+ try:
+ bht.set_hashes(leaves={segnum: blockhash})
+ except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
+ IndexError), e:
+ raise CorruptShareError(reader.peerid,
+ reader.shnum,
+ "block hash tree failure: %s" % e)
+
+ # Reaching this point means that we know that this segment
+ # is correct. Now we need to check to see whether the share
+ # hash chain is also correct.
+ # SDMF wrote share hash chains that didn't contain the
+ # leaves, which would be produced from the block hash tree.
+ # So we need to validate the block hash tree first. If
+ # successful, then bht[0] will contain the root for the
+ # shnum, which will be a leaf in the share hash tree, which
+ # will allow us to validate the rest of the tree.
+ if self.share_hash_tree.needed_hashes(reader.shnum,
+ include_leaf=True) or \
+ self._verify:
+ try:
+ self.share_hash_tree.set_hashes(hashes=sharehashes[1],
+ leaves={reader.shnum: bht[0]})
+ except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
+ IndexError), e:
+ raise CorruptShareError(reader.peerid,
+ reader.shnum,
+ "corrupt hashes: %s" % e)
+
+ self.log('share %d is valid for segment %d' % (reader.shnum,
+ segnum))
+ return {reader.shnum: (block, salt)}
+
+
+ def _get_needed_hashes(self, reader, segnum):
+ """
+ I get the hashes needed to validate segnum from the reader, then return
+ to my caller when this is done.
+ """
+ bht = self._block_hash_trees[reader.shnum]
+ needed = bht.needed_hashes(segnum, include_leaf=True)
+ # The root of the block hash tree is also a leaf in the share
+ # hash tree. So we don't need to fetch it from the remote
+ # server. In the case of files with one segment, this means that
+ # we won't fetch any block hash tree from the remote server,
+ # since the hash of each share of the file is the entire block
+ # hash tree, and is a leaf in the share hash tree. This is fine,
+ # since any share corruption will be detected in the share hash
+ # tree.
+ #needed.discard(0)
+ self.log("getting blockhashes for segment %d, share %d: %s" % \
+ (segnum, reader.shnum, str(needed)))
+ d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
+ if self.share_hash_tree.needed_hashes(reader.shnum):
+ need = self.share_hash_tree.needed_hashes(reader.shnum)
+ self.log("also need sharehashes for share %d: %s" % (reader.shnum,
+ str(need)))
+ d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
+ else:
+ d2 = defer.succeed({}) # the logic in the next method
+ # expects a dict
+ dl = defer.DeferredList([d1, d2], consumeErrors=True)
+ return dl
+
+
+ def _decode_blocks(self, blocks_and_salts, segnum):
+ """
+ I take a list of k blocks and salts, and decode that into a
+ single encrypted segment.
+ """
+ d = {}
+ # We want to merge our dictionaries to the form
+ # {shnum: blocks_and_salts}
+ #
+ # The dictionaries come from validate block that way, so we just
+ # need to merge them.
+ for block_and_salt in blocks_and_salts:
+ d.update(block_and_salt[1])
+
+ # All of these blocks should have the same salt; in SDMF, it is
+ # the file-wide IV, while in MDMF it is the per-segment salt. In
+ # either case, we just need to get one of them and use it.
+ #
+ # d.items()[0] is like (shnum, (block, salt))
+ # d.items()[0][1] is like (block, salt)
+ # d.items()[0][1][1] is the salt.
+ salt = d.items()[0][1][1]
+ # Next, extract just the blocks from the dict. We'll use the
+ # salt in the next step.
+ share_and_shareids = [(k, v[0]) for k, v in d.items()]
+ d2 = dict(share_and_shareids)
+ shareids = []
+ shares = []
+ for shareid, share in d2.items():
shareids.append(shareid)
shares.append(share)
- assert len(shareids) >= k, len(shareids)
+ self._status.set_status("Decoding")
+ started = time.time()
+ assert len(shareids) >= self._required_shares, len(shareids)
# zfec really doesn't want extra shares
- shareids = shareids[:k]
- shares = shares[:k]
-
- fec = codec.CRSDecoder()
- fec.set_params(segsize, k, N)
-
- self.log("params %s, we have %d shares" % ((segsize, k, N), len(shares)))
- self.log("about to decode, shareids=%s" % (shareids,))
- d = defer.maybeDeferred(fec.decode, shares, shareids)
- def _done(buffers):
- self._status.timings["decode"] = time.time() - started
- self.log(" decode done, %d buffers" % len(buffers))
+ shareids = shareids[:self._required_shares]
+ shares = shares[:self._required_shares]
+ self.log("decoding segment %d" % segnum)
+ if segnum == self._num_segments - 1:
+ d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
+ else:
+ d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
+ def _process(buffers):
segment = "".join(buffers)
+ self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
+ segnum=segnum,
+ numsegs=self._num_segments,
+ level=log.NOISY)
self.log(" joined length %d, datalength %d" %
- (len(segment), datalength))
- segment = segment[:datalength]
+ (len(segment), self._data_length))
+ if segnum == self._num_segments - 1:
+ size_to_use = self._tail_data_size
+ else:
+ size_to_use = self._segment_size
+ segment = segment[:size_to_use]
self.log(" segment len=%d" % len(segment))
- return segment
- def _err(f):
- self.log(" decode failed: %s" % f)
- return f
- d.addCallback(_done)
- d.addErrback(_err)
+ self._status.timings.setdefault("decode", 0)
+ self._status.timings['decode'] = time.time() - started
+ return segment, salt
+ d.addCallback(_process)
return d
- def _decrypt(self, crypttext, IV, readkey):
+
+ def _decrypt_segment(self, segment_and_salt):
+ """
+ I take a single segment and its salt, and decrypt it. I return
+ the plaintext of the segment that is in my argument.
+ """
+ segment, salt = segment_and_salt
self._status.set_status("decrypting")
+ self.log("decrypting segment %d" % self._current_segment)
started = time.time()
- key = hashutil.ssk_readkey_data_hash(IV, readkey)
+ key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
decryptor = AES(key)
- plaintext = decryptor.process(crypttext)
- self._status.timings["decrypt"] = time.time() - started
+ plaintext = decryptor.process(segment)
+ self._status.timings.setdefault("decrypt", 0)
+ self._status.timings['decrypt'] = time.time() - started
return plaintext
- def _done(self, res):
- if not self._running:
+
+ def notify_server_corruption(self, peerid, shnum, reason):
+ ss = self.servermap.connections[peerid]
+ ss.callRemoteOnly("advise_corrupt_share",
+ "mutable", self._storage_index, shnum, reason)
+
+
+ def _try_to_validate_privkey(self, enc_privkey, reader):
+ alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
+ alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
+ if alleged_writekey != self._node.get_writekey():
+ self.log("invalid privkey from %s shnum %d" %
+ (reader, reader.shnum),
+ level=log.WEIRD, umid="YIw4tA")
+ if self._verify:
+ self.servermap.mark_bad_share(reader.peerid, reader.shnum,
+ self.verinfo[-2])
+ e = CorruptShareError(reader.peerid,
+ reader.shnum,
+ "invalid privkey")
+ f = failure.Failure(e)
+ self._bad_shares.add((reader.peerid, reader.shnum, f))
return
+
+ # it's good
+ self.log("got valid privkey from shnum %d on reader %s" %
+ (reader.shnum, reader))
+ privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
+ self._node._populate_encprivkey(enc_privkey)
+ self._node._populate_privkey(privkey)
+ self._need_privkey = False
+
+
+ def _check_for_done(self, res):
+ """
+ I check to see if this Retrieve object has successfully finished
+ its work.
+
+ I can exit in the following ways:
+ - If there are no more segments to download, then I exit by
+ causing self._done_deferred to fire with the plaintext
+ content requested by the caller.
+ - If there are still segments to be downloaded, and there
+ are enough active readers (readers which have not broken
+ and have not given us corrupt data) to continue
+ downloading, I send control back to
+ _download_current_segment.
+ - If there are still segments to be downloaded but there are
+ not enough active peers to download them, I ask
+ _add_active_peers to add more peers. If it is successful,
+ it will call _download_current_segment. If there are not
+ enough peers to retrieve the file, then that will cause
+ _done_deferred to errback.
+ """
+ self.log("checking for doneness")
+ if self._current_segment > self._last_segment:
+ # No more segments to download, we're done.
+ self.log("got plaintext, done")
+ return self._done()
+
+ if len(self._active_readers) >= self._required_shares:
+ # More segments to download, but we have enough good peers
+ # in self._active_readers that we can do that without issue,
+ # so go nab the next segment.
+ self.log("not done yet: on segment %d of %d" % \
+ (self._current_segment + 1, self._num_segments))
+ return self._download_current_segment()
+
+ self.log("not done yet: on segment %d of %d, need to add peers" % \
+ (self._current_segment + 1, self._num_segments))
+ return self._add_active_peers()
+
+
+ def _done(self):
+ """
+ I am called by _check_for_done when the download process has
+ finished successfully. After making some useful logging
+ statements, I return the decrypted contents to the owner of this
+ Retrieve object through self._done_deferred.
+ """
self._running = False
self._status.set_active(False)
- self._status.timings["total"] = time.time() - self._started
- # res is either the new contents, or a Failure
- if isinstance(res, failure.Failure):
- self.log("Retrieve done, with failure", failure=res,
- level=log.UNUSUAL)
- self._status.set_status("Failed")
+ now = time.time()
+ self._status.timings['total'] = now - self._started
+ self._status.timings['fetch'] = now - self._started_fetching
+
+ if self._verify:
+ ret = list(self._bad_shares)
+ self.log("done verifying, found %d bad shares" % len(ret))
else:
- self.log("Retrieve done, success!")
- self._status.set_status("Finished")
- self._status.set_progress(1.0)
- # remember the encoding parameters, use them again next time
- (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
- offsets_tuple) = self.verinfo
- self._node._populate_required_shares(k)
- self._node._populate_total_shares(N)
- eventually(self._done_deferred.callback, res)
+ # TODO: upload status here?
+ ret = self._consumer
+ self._consumer.unregisterProducer()
+ eventually(self._done_deferred.callback, ret)
+
+
+ def _failed(self):
+ """
+ I am called by _add_active_peers when there are not enough
+ active peers left to complete the download. After making some
+ useful logging statements, I return an exception to that effect
+ to the caller of this Retrieve object through
+ self._done_deferred.
+ """
+ self._running = False
+ self._status.set_active(False)
+ now = time.time()
+ self._status.timings['total'] = now - self._started
+ self._status.timings['fetch'] = now - self._started_fetching
+ if self._verify:
+ ret = list(self._bad_shares)
+ else:
+ format = ("ran out of peers: "
+ "have %(have)d of %(total)d segments "
+ "found %(bad)d bad shares "
+ "encoding %(k)d-of-%(n)d")
+ args = {"have": self._current_segment,
+ "total": self._num_segments,
+ "need": self._last_segment,
+ "k": self._required_shares,
+ "n": self._total_shares,
+ "bad": len(self._bad_shares)}
+ e = NotEnoughSharesError("%s, last failure: %s" % \
+ (format % args, str(self._last_failure)))
+ f = failure.Failure(e)
+ ret = f
+ eventually(self._done_deferred.callback, ret)