mutable/retrieve: don't write() after we've been pauseProducer'ed
authorBrian Warner <warner@lothar.com>
Mon, 17 Oct 2011 00:03:47 +0000 (17:03 -0700)
committerBrian Warner <warner@lothar.com>
Mon, 17 Oct 2011 00:24:00 +0000 (17:24 -0700)
This fixes a test failure found against current Twisted trunk in
test_mutable.Filenode.test_retrieve_producer_mdmf (when it uses
PausingAndStoppingConsumer). There must be some sort of race: I could
make it fail against Twisted-11.0 if I just increased the 0.5s delay in
test_download.PausingAndStoppingConsumer to about 0.6s, and could make
Twisted-trunk pass by reducing it to about 0.3s .

I fixed the test (as opposed to the bug) by replacing the delay with a
simple reliable eventually(), and adding extra asserts to fail the test
if the consumer's write() method is called while the producer is
supposed to be paused

The bug itself was that mutable.retrieve.Retrieve wasn't checking the
"stopped" flag after resuming from a pause, and thus delivered one
segment to a consumer that wasn't expecting it. I split out
stopped-flag-checking to separate function, which is now called
immediately after _check_for_paused(). I also cleaned up some Deferred
usage and whitespace.

src/allmydata/mutable/retrieve.py
src/allmydata/test/test_download.py

index fc113695231bcefd5179a70f043cfaa81deba565..0e507704669a3367c9e259754b6249b3e5fedd39 100644 (file)
@@ -176,7 +176,7 @@ class Retrieve:
         if self._pause_deferred is not None:
             return
 
-        # fired when the download is unpaused. 
+        # fired when the download is unpaused.
         self._old_status = self._status.get_status()
         self._set_current_status("paused")
 
@@ -210,13 +210,16 @@ class Retrieve:
         the Deferred fires immediately. Otherwise, the Deferred fires
         when the downloader is unpaused.
         """
-        if self._stopped:
-            raise DownloadStopped("our Consumer called stopProducing()")
         if self._pause_deferred is not None:
             d = defer.Deferred()
             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
             return d
-        return defer.succeed(res)
+        return res
+
+    def _check_for_stopped(self, res):
+        if self._stopped:
+            raise DownloadStopped("our Consumer called stopProducing()")
+        return res
 
 
     def download(self, consumer=None, offset=0, size=None):
@@ -665,6 +668,7 @@ class Retrieve:
             # check to see whether we've been paused before writing
             # anything.
             d.addCallback(self._check_for_paused)
+            d.addCallback(self._check_for_stopped)
             d.addCallback(self._set_segment)
             return d
         else:
index 0e9f69439fa580d2f2c549f090a6763b2561ea45..a155182279ccccebac05b304faa3826241c504cb 100644 (file)
@@ -20,7 +20,7 @@ from allmydata.immutable.downloader.common import BadSegmentNumberError, \
 from allmydata.immutable.downloader.status import DownloadStatus
 from allmydata.immutable.downloader.fetcher import SegmentFetcher
 from allmydata.codec import CRSDecoder
-from foolscap.eventual import fireEventually, flushEventualQueue
+from foolscap.eventual import eventually, fireEventually, flushEventualQueue
 
 plaintext = "This is a moderate-sized file.\n" * 10
 mutable_plaintext = "This is a moderate-sized mutable file.\n" * 10
@@ -898,10 +898,14 @@ class PausingConsumer(MemoryConsumer):
         self.producer.resumeProducing()
 
 class PausingAndStoppingConsumer(PausingConsumer):
+    debug_stopped = False
     def write(self, data):
+        if self.debug_stopped:
+            raise Exception("I'm stopped, don't write to me")
         self.producer.pauseProducing()
-        reactor.callLater(0.5, self._stop)
+        eventually(self._stop)
     def _stop(self):
+        self.debug_stopped = True
         self.producer.stopProducing()
 
 class StoppingConsumer(PausingConsumer):