From: Brian Warner Date: Mon, 17 Oct 2011 00:03:47 +0000 (-0700) Subject: mutable/retrieve: don't write() after we've been pauseProducer'ed X-Git-Url: https://git.rkrishnan.org/pf/content/simplejson/reliability?a=commitdiff_plain;h=633641174a7bf18e49bdef581d31fdfcc603d39e;p=tahoe-lafs%2Ftahoe-lafs.git mutable/retrieve: don't write() after we've been pauseProducer'ed This fixes a test failure found against current Twisted trunk in test_mutable.Filenode.test_retrieve_producer_mdmf (when it uses PausingAndStoppingConsumer). There must be some sort of race: I could make it fail against Twisted-11.0 if I just increased the 0.5s delay in test_download.PausingAndStoppingConsumer to about 0.6s, and could make Twisted-trunk pass by reducing it to about 0.3s . I fixed the test (as opposed to the bug) by replacing the delay with a simple reliable eventually(), and adding extra asserts to fail the test if the consumer's write() method is called while the producer is supposed to be paused The bug itself was that mutable.retrieve.Retrieve wasn't checking the "stopped" flag after resuming from a pause, and thus delivered one segment to a consumer that wasn't expecting it. I split out stopped-flag-checking to separate function, which is now called immediately after _check_for_paused(). I also cleaned up some Deferred usage and whitespace. --- diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index fc113695..0e507704 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -176,7 +176,7 @@ class Retrieve: if self._pause_deferred is not None: return - # fired when the download is unpaused. + # fired when the download is unpaused. self._old_status = self._status.get_status() self._set_current_status("paused") @@ -210,13 +210,16 @@ class Retrieve: the Deferred fires immediately. Otherwise, the Deferred fires when the downloader is unpaused. """ - if self._stopped: - raise DownloadStopped("our Consumer called stopProducing()") if self._pause_deferred is not None: d = defer.Deferred() self._pause_deferred.addCallback(lambda ignored: d.callback(res)) return d - return defer.succeed(res) + return res + + def _check_for_stopped(self, res): + if self._stopped: + raise DownloadStopped("our Consumer called stopProducing()") + return res def download(self, consumer=None, offset=0, size=None): @@ -665,6 +668,7 @@ class Retrieve: # check to see whether we've been paused before writing # anything. d.addCallback(self._check_for_paused) + d.addCallback(self._check_for_stopped) d.addCallback(self._set_segment) return d else: diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index 0e9f6943..a1551822 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -20,7 +20,7 @@ from allmydata.immutable.downloader.common import BadSegmentNumberError, \ from allmydata.immutable.downloader.status import DownloadStatus from allmydata.immutable.downloader.fetcher import SegmentFetcher from allmydata.codec import CRSDecoder -from foolscap.eventual import fireEventually, flushEventualQueue +from foolscap.eventual import eventually, fireEventually, flushEventualQueue plaintext = "This is a moderate-sized file.\n" * 10 mutable_plaintext = "This is a moderate-sized mutable file.\n" * 10 @@ -898,10 +898,14 @@ class PausingConsumer(MemoryConsumer): self.producer.resumeProducing() class PausingAndStoppingConsumer(PausingConsumer): + debug_stopped = False def write(self, data): + if self.debug_stopped: + raise Exception("I'm stopped, don't write to me") self.producer.pauseProducing() - reactor.callLater(0.5, self._stop) + eventually(self._stop) def _stop(self): + self.debug_stopped = True self.producer.stopProducing() class StoppingConsumer(PausingConsumer):