From: Zooko O'Whielacronx Date: Thu, 8 Jan 2009 21:42:15 +0000 (-0700) Subject: immutable: add a monitor API to CiphertextDownloader with which to tell it to stop... X-Git-Url: https://git.rkrishnan.org/uri//%22%22?a=commitdiff_plain;h=ade6a4fa747b7656d57045c1d0a171221b0bf1b2;p=tahoe-lafs%2Ftahoe-lafs.git immutable: add a monitor API to CiphertextDownloader with which to tell it to stop its work --- diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index 89689da9..3ffa907a 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -13,6 +13,7 @@ from allmydata import codec, hashtree, uri from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \ IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError from allmydata.immutable import layout +from allmydata.monitor import Monitor from pycryptopp.cipher.aes import AES class IntegrityCheckReject(Exception): @@ -605,11 +606,14 @@ class DownloadStatus: class CiphertextDownloader(log.PrefixingLogMixin): """ I download shares, check their integrity, then decode them, check the integrity of the - resulting ciphertext, then and write it to my target. """ + resulting ciphertext, then and write it to my target. Before I send any new request to a + server, I always ask the "monitor" object that was passed into my constructor whether this + task has been cancelled (by invoking its raise_if_cancelled() method). """ implements(IPushProducer) _status = None - def __init__(self, client, v, target): + def __init__(self, client, v, target, monitor): + precondition(IVerifierURI.providedBy(v), v) precondition(IDownloadTarget.providedBy(target), target) @@ -645,6 +649,7 @@ class CiphertextDownloader(log.PrefixingLogMixin): if IConsumer.providedBy(target): target.registerProducer(self, True) self._target = target + self._monitor = monitor self._opened = False self.active_buckets = {} # k: shnum, v: bucket @@ -908,6 +913,7 @@ class CiphertextDownloader(log.PrefixingLogMixin): return d if self._stopped: raise DownloadStopped("our Consumer called stopProducing()") + self._monitor.raise_if_cancelled() return res def _download_segment(self, res, segnum): @@ -1122,7 +1128,7 @@ class Downloader(service.MultiService): self._all_download_statuses = weakref.WeakKeyDictionary() self._recent_download_statuses = [] - def download(self, u, t, _log_msg_id=None): + def download(self, u, t, _log_msg_id=None, monitor=None): assert self.parent assert self.running u = IFileURI(u) @@ -1138,7 +1144,9 @@ class Downloader(service.MultiService): self.stats_provider.count('downloader.bytes_downloaded', u.get_size()) target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id) - dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target) + if not monitor: + monitor=Monitor() + dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target, monitor=monitor) self._add_download(dl) d = dl.start() return d diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py index 0ec16586..0e2e2a8f 100644 --- a/src/allmydata/test/test_encode.py +++ b/src/allmydata/test/test_encode.py @@ -9,6 +9,7 @@ from allmydata.immutable import encode, upload, download from allmydata.util import hashutil from allmydata.util.assertutil import _assert from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError +from allmydata.monitor import Monitor import common_util as testutil class LostPeerError(Exception): @@ -494,7 +495,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin): if not target: target = download.Data() target = download.DecryptingTarget(target, u.key) - fd = download.CiphertextDownloader(client, u.get_verify_cap(), target) + fd = download.CiphertextDownloader(client, u.get_verify_cap(), target, monitor=Monitor()) # we manually cycle the CiphertextDownloader through a number of steps that # would normally be sequenced by a Deferred chain in