immutable: add a monitor API to CiphertextDownloader with which to tell it to stop...
authorZooko O'Whielacronx <zooko@zooko.com>
Thu, 8 Jan 2009 21:42:15 +0000 (14:42 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Thu, 8 Jan 2009 21:42:15 +0000 (14:42 -0700)
src/allmydata/immutable/download.py
src/allmydata/test/test_encode.py

index 89689da98827ca332a0e005dc1db2bf124c13d54..3ffa907aef9c2aa367e748b45a6882ce33ce7631 100644 (file)
@@ -13,6 +13,7 @@ from allmydata import codec, hashtree, uri
 from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
      IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError
 from allmydata.immutable import layout
+from allmydata.monitor import Monitor
 from pycryptopp.cipher.aes import AES
 
 class IntegrityCheckReject(Exception):
@@ -605,11 +606,14 @@ class DownloadStatus:
 
 class CiphertextDownloader(log.PrefixingLogMixin):
     """ I download shares, check their integrity, then decode them, check the integrity of the
-    resulting ciphertext, then and write it to my target. """
+    resulting ciphertext, then and write it to my target. Before I send any new request to a
+    server, I always ask the "monitor" object that was passed into my constructor whether this
+    task has been cancelled (by invoking its raise_if_cancelled() method). """
     implements(IPushProducer)
     _status = None
 
-    def __init__(self, client, v, target):
+    def __init__(self, client, v, target, monitor):
+
         precondition(IVerifierURI.providedBy(v), v)
         precondition(IDownloadTarget.providedBy(target), target)
 
@@ -645,6 +649,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         if IConsumer.providedBy(target):
             target.registerProducer(self, True)
         self._target = target
+        self._monitor = monitor
         self._opened = False
 
         self.active_buckets = {} # k: shnum, v: bucket
@@ -908,6 +913,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
             return d
         if self._stopped:
             raise DownloadStopped("our Consumer called stopProducing()")
+        self._monitor.raise_if_cancelled()
         return res
 
     def _download_segment(self, res, segnum):
@@ -1122,7 +1128,7 @@ class Downloader(service.MultiService):
         self._all_download_statuses = weakref.WeakKeyDictionary()
         self._recent_download_statuses = []
 
-    def download(self, u, t, _log_msg_id=None):
+    def download(self, u, t, _log_msg_id=None, monitor=None):
         assert self.parent
         assert self.running
         u = IFileURI(u)
@@ -1138,7 +1144,9 @@ class Downloader(service.MultiService):
             self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
 
         target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
-        dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target)
+        if not monitor:
+            monitor=Monitor()
+        dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target, monitor=monitor)
         self._add_download(dl)
         d = dl.start()
         return d
index 0ec165866a66bd7a3c5c6c7736350fe73ec9e358..0e2e2a8f9dafd97670c20a83817160f064381340 100644 (file)
@@ -9,6 +9,7 @@ from allmydata.immutable import encode, upload, download
 from allmydata.util import hashutil
 from allmydata.util.assertutil import _assert
 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
+from allmydata.monitor import Monitor
 import common_util as testutil
 
 class LostPeerError(Exception):
@@ -494,7 +495,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
         if not target:
             target = download.Data()
         target = download.DecryptingTarget(target, u.key)
-        fd = download.CiphertextDownloader(client, u.get_verify_cap(), target)
+        fd = download.CiphertextDownloader(client, u.get_verify_cap(), target, monitor=Monitor())
 
         # we manually cycle the CiphertextDownloader through a number of steps that
         # would normally be sequenced by a Deferred chain in