self._when_finished = observer.OneShotObserverList()
def when_finished(self):
+ # I think this is unused, along with self._when_finished . But I need
+ # to trace the error paths to be sure.
return self._when_finished.when_fired()
def registerProducer(self, producer, streaming):
- print "REG"
self._consumer.registerProducer(producer, streaming)
def unregisterProducer(self):
- print "UNREG"
self._consumer.unregisterProducer()
def open(self, size):
from base64 import b32encode
import os, sys, time, re, simplejson, urllib
from cStringIO import StringIO
+from zope.interface import implements
from twisted.trial import unittest
from twisted.internet import defer
from twisted.internet import threads # CLI tests use deferToThread
from twisted.internet.error import ConnectionDone, ConnectionLost
+from twisted.internet.interfaces import IConsumer, IPushProducer
import allmydata
from allmydata import uri, storage, offloaded
from allmydata.immutable import download, upload, filenode
self.interrupt_after_d.callback(self)
return upload.Data.read(self, length)
+class GrabEverythingConsumer:
+ implements(IConsumer)
+
+ def __init__(self):
+ self.contents = ""
+
+ def registerProducer(self, producer, streaming):
+ assert streaming
+ assert IPushProducer.providedBy(producer)
+
+ def write(self, data):
+ self.contents += data
+
+ def unregisterProducer(self):
+ pass
class SystemTest(SystemTestMixin, unittest.TestCase):
self.failUnlessEqual(newdata, DATA)
d.addCallback(_download_to_filehandle_done)
+ consumer = GrabEverythingConsumer()
+ ct = download.ConsumerAdapter(consumer)
+ d.addCallback(lambda res:
+ self.downloader.download(self.uri, ct))
+ def _download_to_consumer_done(ign):
+ self.failUnlessEqual(consumer.contents, DATA)
+ d.addCallback(_download_to_consumer_done)
+
def _download_nonexistent_uri(res):
baduri = self.mangle_uri(self.uri)
log.msg("about to download non-existent URI", level=log.UNUSUAL,