+ def _test_retrieve_producer(self, version, kind, data):
+ # Now we'll retrieve it into a pausing consumer.
+ c = PausingConsumer()
+ d = version.read(c)
+ d.addCallback(lambda ign: self.failUnlessEqual(c.size, len(data)))
+
+ c2 = PausingAndStoppingConsumer()
+ d.addCallback(lambda ign:
+ self.shouldFail(DownloadStopped, kind+"_pause_stop",
+ "our Consumer called stopProducing()",
+ version.read, c2))
+
+ c3 = StoppingConsumer()
+ d.addCallback(lambda ign:
+ self.shouldFail(DownloadStopped, kind+"_stop",
+ "our Consumer called stopProducing()",
+ version.read, c3))
+
+ c4 = ImmediatelyStoppingConsumer()
+ d.addCallback(lambda ign:
+ self.shouldFail(DownloadStopped, kind+"_stop_imm",
+ "our Consumer called stopProducing()",
+ version.read, c4))
+
+ def _then(ign):
+ c5 = MemoryConsumer()
+ d1 = version.read(c5)
+ c5.producer.stopProducing()
+ return self.shouldFail(DownloadStopped, kind+"_stop_imm2",
+ "our Consumer called stopProducing()",
+ lambda: d1)
+ d.addCallback(_then)
+ return d