from twisted.internet.interfaces import IPushProducer, IConsumer
from foolscap.api import eventually, fireEventually
from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
- MDMF_VERSION, SDMF_VERSION
+ DownloadStopped, MDMF_VERSION, SDMF_VERSION
from allmydata.util import hashutil, log, mathutil
from allmydata.util.dictutil import DictOfSets
from allmydata import hashtree, codec
self._status.set_size(datalength)
self._status.set_encoding(k, N)
self.readers = {}
+ self._stopped = False
self._pause_deferred = None
self._offset = None
self._read_length = None
if self._pause_deferred is not None:
return
- # fired when the download is unpaused.
+ # fired when the download is unpaused.
self._old_status = self._status.get_status()
self._set_current_status("paused")
eventually(p.callback, None)
+ def stopProducing(self):
+ self._stopped = True
+ self.resumeProducing()
+
def _check_for_paused(self, res):
"""
d = defer.Deferred()
self._pause_deferred.addCallback(lambda ignored: d.callback(res))
return d
- return defer.succeed(res)
+ return res
+
+ def _check_for_stopped(self, res):
+ if self._stopped:
+ raise DownloadStopped("our Consumer called stopProducing()")
+ return res
def download(self, consumer=None, offset=0, size=None):
self._setup_encoding_parameters()
self.log("starting download")
self._started_fetching = time.time()
- d = self._add_active_peers()
- # ...
# The download process beyond this is a state machine.
# _add_active_peers will select the peers that we want to use
# for the download, and then attempt to start downloading. After
# peers before downloading all of the segments, _done_deferred
# will errback. Otherwise, it will eventually callback with the
# contents of the mutable file.
+ self.loop()
return self._done_deferred
+ def loop(self):
+ d = fireEventually(None) # avoid #237 recursion limit problem
+ d.addCallback(lambda ign: self._activate_enough_peers())
+ d.addCallback(lambda ign: self._download_current_segment())
+ # when we're done, _download_current_segment will call _done. If we
+ # aren't, it will call loop() again.
+ d.addErrback(self._error)
+
def _setup_download(self):
self._started = time.time()
self._status.set_status("Retrieving Shares")
self.shares = {} # maps shnum to validated blocks
self._active_readers = [] # list of active readers for this dl.
- self._validated_readers = set() # set of readers that we have
- # validated the prefix of
self._block_hash_trees = {} # shnum => hashtree
# We need one share hash tree for the entire file; its leaves
self._current_segment = self._start_segment
- def _add_active_peers(self):
+ def _activate_enough_peers(self):
"""
I populate self._active_readers with enough active readers to
retrieve the contents of this mutable file. I am called before
# instead of just reasoning about what the effect might be. Out
# of scope for MDMF, though.)
- # We need at least self._required_shares readers to download a
- # segment.
- if self._verify:
- needed = self._total_shares
- else:
- needed = self._required_shares - len(self._active_readers)
# XXX: Why don't format= log messages work here?
- self.log("adding %d peers to the active peers list" % needed)
-
- # We favor lower numbered shares, since FEC is faster with
- # primary shares than with other shares, and lower-numbered
- # shares are more likely to be primary than higher numbered
- # shares.
- active_shnums = set(sorted(self.remaining_sharemap.keys()))
- # We shouldn't consider adding shares that we already have; this
- # will cause problems later.
- active_shnums -= set([reader.shnum for reader in self._active_readers])
- active_shnums = list(active_shnums)[:needed]
- if len(active_shnums) < needed and not self._verify:
- # We don't have enough readers to retrieve the file; fail.
- return self._failed()
-
- for shnum in active_shnums:
- self._active_readers.append(self.readers[shnum])
- self.log("added reader for share %d" % shnum)
- assert len(self._active_readers) >= self._required_shares
- # Conceptually, this is part of the _add_active_peers step. It
- # validates the prefixes of newly added readers to make sure
- # that they match what we are expecting for self.verinfo. If
- # validation is successful, _validate_active_prefixes will call
- # _download_current_segment for us. If validation is
- # unsuccessful, then _validate_prefixes will remove the peer and
- # call _add_active_peers again, where we will attempt to rectify
- # the problem by choosing another peer.
- return self._validate_active_prefixes()
-
-
- def _validate_active_prefixes(self):
- """
- I check to make sure that the prefixes on the peers that I am
- currently reading from match the prefix that we want to see, as
- said in self.verinfo.
-
- If I find that all of the active peers have acceptable prefixes,
- I pass control to _download_current_segment, which will use
- those peers to do cool things. If I find that some of the active
- peers have unacceptable prefixes, I will remove them from active
- peers (and from further consideration) and call
- _add_active_peers to attempt to rectify the situation. I keep
- track of which peers I have already validated so that I don't
- need to do so again.
- """
- assert self._active_readers, "No more active readers"
- new_readers = set(self._active_readers) - self._validated_readers
- self.log('validating %d newly-added active readers' % len(new_readers))
+ known_shnums = set(self.remaining_sharemap.keys())
+ used_shnums = set([r.shnum for r in self._active_readers])
+ unused_shnums = known_shnums - used_shnums
- for reader in new_readers:
- self._validated_readers.add(reader)
- # Each time we validate a reader, we check to see if we need the
+ if self._verify:
+ new_shnums = unused_shnums # use them all
+ elif len(self._active_readers) < self._required_shares:
+ # need more shares
+ more = self._required_shares - len(self._active_readers)
+ # We favor lower numbered shares, since FEC is faster with
+ # primary shares than with other shares, and lower-numbered
+ # shares are more likely to be primary than higher numbered
+ # shares.
+ new_shnums = sorted(unused_shnums)[:more]
+ if len(new_shnums) < more:
+ # We don't have enough readers to retrieve the file; fail.
+ self._raise_notenoughshareserror()
+ else:
+ new_shnums = []
+
+ self.log("adding %d new peers to the active list" % len(new_shnums))
+ for shnum in new_shnums:
+ reader = self.readers[shnum]
+ self._active_readers.append(reader)
+ self.log("added reader for share %d" % shnum)
+ # Each time we add a reader, we check to see if we need the
# private key. If we do, we politely ask for it and then continue
# computing. If we find that we haven't gotten it at the end of
# segment decoding, then we'll take more drastic measures.
d.addCallback(self._try_to_validate_privkey, reader)
# XXX: don't just drop the Deferred. We need error-reporting
# but not flow-control here.
- return self._download_current_segment()
-
+ assert len(self._active_readers) >= self._required_shares
def _try_to_validate_prefix(self, prefix, reader):
"""
that this Retrieve is currently responsible for downloading.
"""
assert len(self._active_readers) >= self._required_shares
- if self._current_segment <= self._last_segment:
- d = self._process_segment(self._current_segment)
- else:
- d = defer.succeed(None)
- d.addBoth(self._turn_barrier)
- d.addCallback(self._check_for_done)
+ if self._current_segment > self._last_segment:
+ # No more segments to download, we're done.
+ self.log("got plaintext, done")
+ return self._done()
+ self.log("on segment %d of %d" %
+ (self._current_segment + 1, self._num_segments))
+ d = self._process_segment(self._current_segment)
+ d.addCallback(lambda ign: self.loop())
return d
-
- def _turn_barrier(self, result):
- """
- I help the download process avoid the recursion limit issues
- discussed in #237.
- """
- return fireEventually(result)
-
-
def _process_segment(self, segnum):
"""
I download, validate, decode, and decrypt one segment of the
# check to see whether we've been paused before writing
# anything.
d.addCallback(self._check_for_paused)
+ d.addCallback(self._check_for_stopped)
d.addCallback(self._set_segment)
return d
else:
self._need_privkey = False
- def _check_for_done(self, res):
- """
- I check to see if this Retrieve object has successfully finished
- its work.
-
- I can exit in the following ways:
- - If there are no more segments to download, then I exit by
- causing self._done_deferred to fire with the plaintext
- content requested by the caller.
- - If there are still segments to be downloaded, and there
- are enough active readers (readers which have not broken
- and have not given us corrupt data) to continue
- downloading, I send control back to
- _download_current_segment.
- - If there are still segments to be downloaded but there are
- not enough active peers to download them, I ask
- _add_active_peers to add more peers. If it is successful,
- it will call _download_current_segment. If there are not
- enough peers to retrieve the file, then that will cause
- _done_deferred to errback.
- """
- self.log("checking for doneness")
- if self._current_segment > self._last_segment:
- # No more segments to download, we're done.
- self.log("got plaintext, done")
- return self._done()
-
- if len(self._active_readers) >= self._required_shares:
- # More segments to download, but we have enough good peers
- # in self._active_readers that we can do that without issue,
- # so go nab the next segment.
- self.log("not done yet: on segment %d of %d" % \
- (self._current_segment + 1, self._num_segments))
- return self._download_current_segment()
-
- self.log("not done yet: on segment %d of %d, need to add peers" % \
- (self._current_segment + 1, self._num_segments))
- return self._add_active_peers()
-
def _done(self):
"""
- I am called by _check_for_done when the download process has
- finished successfully. After making some useful logging
+ I am called by _download_current_segment when the download process
+ has finished successfully. After making some useful logging
statements, I return the decrypted contents to the owner of this
Retrieve object through self._done_deferred.
"""
eventually(self._done_deferred.callback, ret)
- def _failed(self):
+ def _raise_notenoughshareserror(self):
"""
- I am called by _add_active_peers when there are not enough
+ I am called by _activate_enough_peers when there are not enough
active peers left to complete the download. After making some
- useful logging statements, I return an exception to that effect
+ useful logging statements, I throw an exception to that effect
to the caller of this Retrieve object through
self._done_deferred.
"""
+
+ format = ("ran out of peers: "
+ "have %(have)d of %(total)d segments "
+ "found %(bad)d bad shares "
+ "encoding %(k)d-of-%(n)d")
+ args = {"have": self._current_segment,
+ "total": self._num_segments,
+ "need": self._last_segment,
+ "k": self._required_shares,
+ "n": self._total_shares,
+ "bad": len(self._bad_shares)}
+ raise NotEnoughSharesError("%s, last failure: %s" %
+ (format % args, str(self._last_failure)))
+
+ def _error(self, f):
+ # all errors, including NotEnoughSharesError, land here
self._running = False
self._status.set_active(False)
now = time.time()
self._status.timings['total'] = now - self._started
self._status.timings['fetch'] = now - self._started_fetching
self._status.set_status("Failed")
-
- if self._verify:
- ret = list(self._bad_shares)
- else:
- format = ("ran out of peers: "
- "have %(have)d of %(total)d segments "
- "found %(bad)d bad shares "
- "encoding %(k)d-of-%(n)d")
- args = {"have": self._current_segment,
- "total": self._num_segments,
- "need": self._last_segment,
- "k": self._required_shares,
- "n": self._total_shares,
- "bad": len(self._bad_shares)}
- e = NotEnoughSharesError("%s, last failure: %s" % \
- (format % args, str(self._last_failure)))
- f = failure.Failure(e)
- ret = f
- eventually(self._done_deferred.callback, ret)
+ eventually(self._done_deferred.errback, f)