]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/retrieve.py
Retrieve: implement/test stopProducing
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
index 865b0e10518fdadad9c3ba5a29f889d801275779..9c09abfecf2aaf06f76eaba6309ab723b2a8ab64 100644 (file)
@@ -7,7 +7,7 @@ from twisted.python import failure
 from twisted.internet.interfaces import IPushProducer, IConsumer
 from foolscap.api import eventually, fireEventually
 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
-                                 MDMF_VERSION, SDMF_VERSION
+     DownloadStopped, MDMF_VERSION, SDMF_VERSION
 from allmydata.util import hashutil, log, mathutil
 from allmydata.util.dictutil import DictOfSets
 from allmydata import hashtree, codec
@@ -143,6 +143,7 @@ class Retrieve:
         self._status.set_size(datalength)
         self._status.set_encoding(k, N)
         self.readers = {}
+        self._stopped = False
         self._pause_deferred = None
         self._offset = None
         self._read_length = None
@@ -196,6 +197,10 @@ class Retrieve:
 
         eventually(p.callback, None)
 
+    def stopProducing(self):
+        self._stopped = True
+        self.resumeProducing()
+
 
     def _check_for_paused(self, res):
         """
@@ -205,6 +210,8 @@ class Retrieve:
         the Deferred fires immediately. Otherwise, the Deferred fires
         when the downloader is unpaused.
         """
+        if self._stopped:
+            raise DownloadStopped("our Consumer called stopProducing()")
         if self._pause_deferred is not None:
             d = defer.Deferred()
             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
@@ -286,8 +293,6 @@ class Retrieve:
 
         self.shares = {} # maps shnum to validated blocks
         self._active_readers = [] # list of active readers for this dl.
-        self._validated_readers = set() # set of readers that we have
-                                        # validated the prefix of
         self._block_hash_trees = {} # shnum => hashtree
 
         # We need one share hash tree for the entire file; its leaves
@@ -484,9 +489,8 @@ class Retrieve:
         for shnum in new_shnums:
             reader = self.readers[shnum]
             self._active_readers.append(reader)
-            self._validated_readers.add(reader)
             self.log("added reader for share %d" % shnum)
-            # Each time we validate a reader, we check to see if we need the
+            # Each time we add a reader, we check to see if we need the
             # private key. If we do, we politely ask for it and then continue
             # computing. If we find that we haven't gotten it at the end of
             # segment decoding, then we'll take more drastic measures.