from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError
from allmydata.immutable import layout
+from allmydata.monitor import Monitor
from pycryptopp.cipher.aes import AES
class IntegrityCheckReject(Exception):
class CiphertextDownloader(log.PrefixingLogMixin):
""" I download shares, check their integrity, then decode them, check the integrity of the
- resulting ciphertext, then and write it to my target. """
+ resulting ciphertext, then and write it to my target. Before I send any new request to a
+ server, I always ask the "monitor" object that was passed into my constructor whether this
+ task has been cancelled (by invoking its raise_if_cancelled() method). """
implements(IPushProducer)
_status = None
- def __init__(self, client, v, target):
+ def __init__(self, client, v, target, monitor):
+
precondition(IVerifierURI.providedBy(v), v)
precondition(IDownloadTarget.providedBy(target), target)
if IConsumer.providedBy(target):
target.registerProducer(self, True)
self._target = target
+ self._monitor = monitor
self._opened = False
self.active_buckets = {} # k: shnum, v: bucket
return d
if self._stopped:
raise DownloadStopped("our Consumer called stopProducing()")
+ self._monitor.raise_if_cancelled()
return res
def _download_segment(self, res, segnum):
self._all_download_statuses = weakref.WeakKeyDictionary()
self._recent_download_statuses = []
- def download(self, u, t, _log_msg_id=None):
+ def download(self, u, t, _log_msg_id=None, monitor=None):
assert self.parent
assert self.running
u = IFileURI(u)
self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
- dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target)
+ if not monitor:
+ monitor=Monitor()
+ dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target, monitor=monitor)
self._add_download(dl)
d = dl.start()
return d
from allmydata.util import hashutil
from allmydata.util.assertutil import _assert
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
+from allmydata.monitor import Monitor
import common_util as testutil
class LostPeerError(Exception):
if not target:
target = download.Data()
target = download.DecryptingTarget(target, u.key)
- fd = download.CiphertextDownloader(client, u.get_verify_cap(), target)
+ fd = download.CiphertextDownloader(client, u.get_verify_cap(), target, monitor=Monitor())
# we manually cycle the CiphertextDownloader through a number of steps that
# would normally be sequenced by a Deferred chain in