from twisted.internet.interfaces import IPushProducer, IConsumer
from foolscap.api import eventually, fireEventually
from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
- MDMF_VERSION, SDMF_VERSION
+ DownloadStopped, MDMF_VERSION, SDMF_VERSION
from allmydata.util import hashutil, log, mathutil
from allmydata.util.dictutil import DictOfSets
from allmydata import hashtree, codec
self._status.set_size(datalength)
self._status.set_encoding(k, N)
self.readers = {}
+ self._stopped = False
self._pause_deferred = None
self._offset = None
self._read_length = None
if self._pause_deferred is not None:
return
- # fired when the download is unpaused.
+ # fired when the download is unpaused.
self._old_status = self._status.get_status()
self._set_current_status("paused")
eventually(p.callback, None)
+ def stopProducing(self):
+ self._stopped = True
+ self.resumeProducing()
+
def _check_for_paused(self, res):
"""
d = defer.Deferred()
self._pause_deferred.addCallback(lambda ignored: d.callback(res))
return d
- return defer.succeed(res)
+ return res
+
+ def _check_for_stopped(self, res):
+ if self._stopped:
+ raise DownloadStopped("our Consumer called stopProducing()")
+ return res
def download(self, consumer=None, offset=0, size=None):
self.shares = {} # maps shnum to validated blocks
self._active_readers = [] # list of active readers for this dl.
- self._validated_readers = set() # set of readers that we have
- # validated the prefix of
self._block_hash_trees = {} # shnum => hashtree
# We need one share hash tree for the entire file; its leaves
# instead of just reasoning about what the effect might be. Out
# of scope for MDMF, though.)
- # We need at least self._required_shares readers to download a
- # segment. If we're verifying, we need all shares.
- if self._verify:
- needed = self._total_shares
- else:
- needed = self._required_shares
# XXX: Why don't format= log messages work here?
- self.log("adding %d peers to the active peers list" % needed)
- if len(self._active_readers) >= needed:
- # enough shares are active
- return
-
- more = needed - len(self._active_readers)
known_shnums = set(self.remaining_sharemap.keys())
used_shnums = set([r.shnum for r in self._active_readers])
unused_shnums = known_shnums - used_shnums
- # We favor lower numbered shares, since FEC is faster with
- # primary shares than with other shares, and lower-numbered
- # shares are more likely to be primary than higher numbered
- # shares.
- new_shnums = sorted(unused_shnums)[:more]
- if len(new_shnums) < more and not self._verify:
- # We don't have enough readers to retrieve the file; fail.
- self._raise_notenoughshareserror()
+ if self._verify:
+ new_shnums = unused_shnums # use them all
+ elif len(self._active_readers) < self._required_shares:
+ # need more shares
+ more = self._required_shares - len(self._active_readers)
+ # We favor lower numbered shares, since FEC is faster with
+ # primary shares than with other shares, and lower-numbered
+ # shares are more likely to be primary than higher numbered
+ # shares.
+ new_shnums = sorted(unused_shnums)[:more]
+ if len(new_shnums) < more:
+ # We don't have enough readers to retrieve the file; fail.
+ self._raise_notenoughshareserror()
+ else:
+ new_shnums = []
+
+ self.log("adding %d new peers to the active list" % len(new_shnums))
for shnum in new_shnums:
reader = self.readers[shnum]
self._active_readers.append(reader)
- self._validated_readers.add(reader)
self.log("added reader for share %d" % shnum)
- # Each time we validate a reader, we check to see if we need the
+ # Each time we add a reader, we check to see if we need the
# private key. If we do, we politely ask for it and then continue
# computing. If we find that we haven't gotten it at the end of
# segment decoding, then we'll take more drastic measures.
# check to see whether we've been paused before writing
# anything.
d.addCallback(self._check_for_paused)
+ d.addCallback(self._check_for_stopped)
d.addCallback(self._set_segment)
return d
else: