This fixes a test failure found against current Twisted trunk in
test_mutable.Filenode.test_retrieve_producer_mdmf (when it uses
PausingAndStoppingConsumer). There must be some sort of race: I could
make it fail against Twisted-11.0 if I just increased the 0.5s delay in
test_download.PausingAndStoppingConsumer to about 0.6s, and could make
Twisted-trunk pass by reducing it to about 0.3s .
I fixed the test (as opposed to the bug) by replacing the delay with a
simple reliable eventually(), and adding extra asserts to fail the test
if the consumer's write() method is called while the producer is
supposed to be paused
The bug itself was that mutable.retrieve.Retrieve wasn't checking the
"stopped" flag after resuming from a pause, and thus delivered one
segment to a consumer that wasn't expecting it. I split out
stopped-flag-checking to separate function, which is now called
immediately after _check_for_paused(). I also cleaned up some Deferred
usage and whitespace.
if self._pause_deferred is not None:
return
if self._pause_deferred is not None:
return
- # fired when the download is unpaused.
+ # fired when the download is unpaused.
self._old_status = self._status.get_status()
self._set_current_status("paused")
self._old_status = self._status.get_status()
self._set_current_status("paused")
the Deferred fires immediately. Otherwise, the Deferred fires
when the downloader is unpaused.
"""
the Deferred fires immediately. Otherwise, the Deferred fires
when the downloader is unpaused.
"""
- if self._stopped:
- raise DownloadStopped("our Consumer called stopProducing()")
if self._pause_deferred is not None:
d = defer.Deferred()
self._pause_deferred.addCallback(lambda ignored: d.callback(res))
return d
if self._pause_deferred is not None:
d = defer.Deferred()
self._pause_deferred.addCallback(lambda ignored: d.callback(res))
return d
- return defer.succeed(res)
+ return res
+
+ def _check_for_stopped(self, res):
+ if self._stopped:
+ raise DownloadStopped("our Consumer called stopProducing()")
+ return res
def download(self, consumer=None, offset=0, size=None):
def download(self, consumer=None, offset=0, size=None):
# check to see whether we've been paused before writing
# anything.
d.addCallback(self._check_for_paused)
# check to see whether we've been paused before writing
# anything.
d.addCallback(self._check_for_paused)
+ d.addCallback(self._check_for_stopped)
d.addCallback(self._set_segment)
return d
else:
d.addCallback(self._set_segment)
return d
else:
from allmydata.immutable.downloader.status import DownloadStatus
from allmydata.immutable.downloader.fetcher import SegmentFetcher
from allmydata.codec import CRSDecoder
from allmydata.immutable.downloader.status import DownloadStatus
from allmydata.immutable.downloader.fetcher import SegmentFetcher
from allmydata.codec import CRSDecoder
-from foolscap.eventual import fireEventually, flushEventualQueue
+from foolscap.eventual import eventually, fireEventually, flushEventualQueue
plaintext = "This is a moderate-sized file.\n" * 10
mutable_plaintext = "This is a moderate-sized mutable file.\n" * 10
plaintext = "This is a moderate-sized file.\n" * 10
mutable_plaintext = "This is a moderate-sized mutable file.\n" * 10
self.producer.resumeProducing()
class PausingAndStoppingConsumer(PausingConsumer):
self.producer.resumeProducing()
class PausingAndStoppingConsumer(PausingConsumer):
+ if self.debug_stopped:
+ raise Exception("I'm stopped, don't write to me")
self.producer.pauseProducing()
self.producer.pauseProducing()
- reactor.callLater(0.5, self._stop)
+ self.debug_stopped = True
self.producer.stopProducing()
class StoppingConsumer(PausingConsumer):
self.producer.stopProducing()
class StoppingConsumer(PausingConsumer):