From ade6a4fa747b7656d57045c1d0a171221b0bf1b2 Mon Sep 17 00:00:00 2001
From: Zooko O'Whielacronx <zooko@zooko.com>
Date: Thu, 8 Jan 2009 14:42:15 -0700
Subject: [PATCH] immutable: add a monitor API to CiphertextDownloader with
 which to tell it to stop its work

---
 src/allmydata/immutable/download.py | 16 ++++++++++++----
 src/allmydata/test/test_encode.py   |  3 ++-
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py
index 89689da9..3ffa907a 100644
--- a/src/allmydata/immutable/download.py
+++ b/src/allmydata/immutable/download.py
@@ -13,6 +13,7 @@ from allmydata import codec, hashtree, uri
 from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
      IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError
 from allmydata.immutable import layout
+from allmydata.monitor import Monitor
 from pycryptopp.cipher.aes import AES
 
 class IntegrityCheckReject(Exception):
@@ -605,11 +606,14 @@ class DownloadStatus:
 
 class CiphertextDownloader(log.PrefixingLogMixin):
     """ I download shares, check their integrity, then decode them, check the integrity of the
-    resulting ciphertext, then and write it to my target. """
+    resulting ciphertext, then and write it to my target. Before I send any new request to a
+    server, I always ask the "monitor" object that was passed into my constructor whether this
+    task has been cancelled (by invoking its raise_if_cancelled() method). """
     implements(IPushProducer)
     _status = None
 
-    def __init__(self, client, v, target):
+    def __init__(self, client, v, target, monitor):
+
         precondition(IVerifierURI.providedBy(v), v)
         precondition(IDownloadTarget.providedBy(target), target)
 
@@ -645,6 +649,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         if IConsumer.providedBy(target):
             target.registerProducer(self, True)
         self._target = target
+        self._monitor = monitor
         self._opened = False
 
         self.active_buckets = {} # k: shnum, v: bucket
@@ -908,6 +913,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
             return d
         if self._stopped:
             raise DownloadStopped("our Consumer called stopProducing()")
+        self._monitor.raise_if_cancelled()
         return res
 
     def _download_segment(self, res, segnum):
@@ -1122,7 +1128,7 @@ class Downloader(service.MultiService):
         self._all_download_statuses = weakref.WeakKeyDictionary()
         self._recent_download_statuses = []
 
-    def download(self, u, t, _log_msg_id=None):
+    def download(self, u, t, _log_msg_id=None, monitor=None):
         assert self.parent
         assert self.running
         u = IFileURI(u)
@@ -1138,7 +1144,9 @@ class Downloader(service.MultiService):
             self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
 
         target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
-        dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target)
+        if not monitor:
+            monitor=Monitor()
+        dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target, monitor=monitor)
         self._add_download(dl)
         d = dl.start()
         return d
diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py
index 0ec16586..0e2e2a8f 100644
--- a/src/allmydata/test/test_encode.py
+++ b/src/allmydata/test/test_encode.py
@@ -9,6 +9,7 @@ from allmydata.immutable import encode, upload, download
 from allmydata.util import hashutil
 from allmydata.util.assertutil import _assert
 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
+from allmydata.monitor import Monitor
 import common_util as testutil
 
 class LostPeerError(Exception):
@@ -494,7 +495,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
         if not target:
             target = download.Data()
         target = download.DecryptingTarget(target, u.key)
-        fd = download.CiphertextDownloader(client, u.get_verify_cap(), target)
+        fd = download.CiphertextDownloader(client, u.get_verify_cap(), target, monitor=Monitor())
 
         # we manually cycle the CiphertextDownloader through a number of steps that
         # would normally be sequenced by a Deferred chain in
-- 
2.45.2