* remove Downloader.download_to_data/download_to_filename/download_to_filehandle
* remove download.Data/FileName/FileHandle targets
* remove filenode.download/download_to_data/download_to_filename methods
* leave Downloader.download (the whole Downloader will go away eventually)
* add util.consumer.MemoryConsumer/download_to_data, for convenience
(this is mostly used by unit tests, but it gets used by enough non-test
code to warrant putting it in allmydata.util)
* update tests
* removes about 180 lines of code. Yay negative code days!
Overall plan is to rewrite immutable/download.py and leave filenode.read() as
the sole read-side API.
from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
+from twisted.internet.interfaces import IConsumer
from foolscap.api import Referenceable
from allmydata.interfaces import RIControlClient
from allmydata.util import fileutil, mathutil
-from allmydata.immutable import upload, download
+from allmydata.immutable import upload
from twisted.python import log
def get_memory_usage():
stats["VmPeak"],
where))
+class FileWritingConsumer:
+ implements(IConsumer)
+ def __init__(self, filename):
+ self.done = False
+ self.f = open(filename, "wb")
+ def registerProducer(self, p, streaming):
+ if streaming:
+ p.resumeProducing()
+ else:
+ while not self.done:
+ p.resumeProducing()
+ def write(self, data):
+ self.f.write(data)
+ def unregisterProducer(self):
+ self.done = True
+ self.f.close()
class ControlServer(Referenceable, service.Service):
implements(RIControlClient)
def remote_download_from_uri_to_file(self, uri, filename):
filenode = self.parent.create_node_from_uri(uri)
- d = filenode.download_to_filename(filename)
+ c = FileWritingConsumer(filename)
+ d = filenode.read(c)
d.addCallback(lambda res: filename)
return d
if i >= self.count:
return
n = self.parent.create_node_from_uri(self.uris[i])
- d1 = n.download(download.FileHandle(Discard()))
+ d1 = n.read(DiscardingConsumer())
d1.addCallback(_download_one_file, i+1)
return d1
d.addCallback(_download_one_file, 0)
os.unlink(fn)
return res
-class Discard:
+class DiscardingConsumer:
+ implements(IConsumer)
+ def __init__(self):
+ self.done = False
+ def registerProducer(self, p, streaming):
+ if streaming:
+ p.resumeProducing()
+ else:
+ while not self.done:
+ p.resumeProducing()
def write(self, data):
pass
- # download_to_filehandle explicitly does not close the filehandle it was
- # given: that is reserved for the provider of the filehandle. Therefore
- # the lack of a close() method on this otherwise filehandle-like object
- # is a part of the test.
+ def unregisterProducer(self):
+ self.done = True
from allmydata.util import hashutil, mathutil, base32, log
from allmydata.util.assertutil import precondition
from allmydata.util.netstring import netstring, split_netstring
+from allmydata.util.consumer import download_to_data
from allmydata.uri import LiteralFileURI, from_string, wrap_dirnode_cap
from pycryptopp.cipher.aes import AES
from allmydata.util.dictutil import AuxValueDict
# use the IMutableFileNode API.
d = self._node.download_best_version()
else:
- d = self._node.download_to_data()
+ d = download_to_data(self._node)
d.addCallback(self._unpack_contents)
return d
from allmydata.interfaces import IDirectoryNode, ExistingChildError, \
NoSuchChildError
-from allmydata.immutable.download import ConsumerAdapter
from allmydata.immutable.upload import FileHandle
class ReadFile:
def __init__(self, node):
self.node = node
def send(self, consumer):
- ad = ConsumerAdapter(consumer)
- d = self.node.download(ad)
+ d = self.node.read(consumer)
return d # when consumed
class FileWriter:
from twisted.python import components
from twisted.application import service, strports
from twisted.internet import defer
-from twisted.internet.interfaces import IConsumer
from twisted.conch.ssh import factory, keys, session
from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser
from twisted.conch.avatar import ConchUser
from allmydata.interfaces import IDirectoryNode, ExistingChildError, \
NoSuchChildError
from allmydata.immutable.upload import FileHandle
-
-class MemoryConsumer:
- implements(IConsumer)
- def __init__(self):
- self.chunks = []
- self.done = False
- def registerProducer(self, p, streaming):
- if streaming:
- # call resumeProducing once to start things off
- p.resumeProducing()
- else:
- while not self.done:
- p.resumeProducing()
- def write(self, data):
- self.chunks.append(data)
- def unregisterProducer(self):
- self.done = True
-
-def download_to_data(n, offset=0, size=None):
- d = n.read(MemoryConsumer(), offset, size)
- d.addCallback(lambda mc: "".join(mc.chunks))
- return d
+from allmydata.util.consumer import download_to_data
class ReadFile:
implements(ISFTPFile)
-import os, random, weakref, itertools, time
+import random, weakref, itertools, time
from zope.interface import implements
from twisted.internet import defer
from twisted.internet.interfaces import IPushProducer, IConsumer
return self._status
-class FileName:
- implements(IDownloadTarget)
- def __init__(self, filename):
- self._filename = filename
- self.f = None
- def open(self, size):
- self.f = open(self._filename, "wb")
- return self.f
- def write(self, data):
- self.f.write(data)
- def close(self):
- if self.f:
- self.f.close()
- def fail(self, why):
- if self.f:
- self.f.close()
- os.unlink(self._filename)
- def register_canceller(self, cb):
- pass # we won't use it
- def finish(self):
- pass
- # The following methods are just because the target might be a
- # repairer.DownUpConnector, and just because the current CHKUpload object
- # expects to find the storage index and encoding parameters in its
- # Uploadable.
- def set_storageindex(self, storageindex):
- pass
- def set_encodingparams(self, encodingparams):
- pass
-
-class Data:
- implements(IDownloadTarget)
- def __init__(self):
- self._data = []
- def open(self, size):
- pass
- def write(self, data):
- self._data.append(data)
- def close(self):
- self.data = "".join(self._data)
- del self._data
- def fail(self, why):
- del self._data
- def register_canceller(self, cb):
- pass # we won't use it
- def finish(self):
- return self.data
- # The following methods are just because the target might be a
- # repairer.DownUpConnector, and just because the current CHKUpload object
- # expects to find the storage index and encoding parameters in its
- # Uploadable.
- def set_storageindex(self, storageindex):
- pass
- def set_encodingparams(self, encodingparams):
- pass
-
-class FileHandle:
- """Use me to download data to a pre-defined filehandle-like object. I
- will use the target's write() method. I will *not* close the filehandle:
- I leave that up to the originator of the filehandle. The download process
- will return the filehandle when it completes.
- """
- implements(IDownloadTarget)
- def __init__(self, filehandle):
- self._filehandle = filehandle
- def open(self, size):
- pass
- def write(self, data):
- self._filehandle.write(data)
- def close(self):
- # the originator of the filehandle reserves the right to close it
- pass
- def fail(self, why):
- pass
- def register_canceller(self, cb):
- pass
- def finish(self):
- return self._filehandle
- # The following methods are just because the target might be a
- # repairer.DownUpConnector, and just because the current CHKUpload object
- # expects to find the storage index and encoding parameters in its
- # Uploadable.
- def set_storageindex(self, storageindex):
- pass
- def set_encodingparams(self, encodingparams):
- pass
-
class ConsumerAdapter:
implements(IDownloadTarget, IConsumer)
def __init__(self, consumer):
history.add_download(dl.get_download_status())
d = dl.start()
return d
-
- # utility functions
- def download_to_data(self, uri, _log_msg_id=None, history=None):
- return self.download(uri, Data(), _log_msg_id=_log_msg_id, history=history)
- def download_to_filename(self, uri, filename, _log_msg_id=None):
- return self.download(uri, FileName(filename), _log_msg_id=_log_msg_id)
- def download_to_filehandle(self, uri, filehandle, _log_msg_id=None):
- return self.download(uri, FileHandle(filehandle), _log_msg_id=_log_msg_id)
from cStringIO import StringIO
from zope.interface import implements
from twisted.internet import defer
-from twisted.internet.interfaces import IPushProducer, IConsumer
+from twisted.internet.interfaces import IPushProducer
from twisted.protocols import basic
from foolscap.api import eventually
from allmydata.interfaces import IImmutableFileNode, ICheckable, \
return v.start()
def read(self, consumer, offset=0, size=None):
+ self.log("read", offset=offset, size=size,
+ umid="UPP8FA", level=log.OPERATIONAL)
if size is None:
size = self.get_size() - offset
size = min(size, self.get_size() - offset)
if offset == 0 and size == self.get_size():
# don't use the cache, just do a normal streaming download
self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
- return self.download(download.ConsumerAdapter(consumer))
+ target = download.ConsumerAdapter(consumer)
+ return self._downloader.download(self.get_cap(), target,
+ self._parentmsgid,
+ history=self._history)
d = self.download_cache.when_range_available(offset, size)
d.addCallback(lambda res:
self.download_cache.read(consumer, offset, size))
return d
- def download(self, target):
- return self._downloader.download(self.get_cap(), target,
- self._parentmsgid,
- history=self._history)
-
- def download_to_data(self):
- return self._downloader.download_to_data(self.get_cap(),
- history=self._history)
- def download_to_filename(self, filename):
- return self._downloader.download_to_filename(self.get_cap(), filename)
-
class LiteralProducer:
implements(IPushProducer)
def resumeProducing(self):
d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
d.addCallback(lambda lastSent: consumer)
return d
-
- def download(self, target):
- # note that this does not update the stats_provider
- data = self.u.data
- if IConsumer.providedBy(target):
- target.registerProducer(LiteralProducer(), True)
- target.open(len(data))
- target.write(data)
- if IConsumer.providedBy(target):
- target.unregisterProducer()
- target.close()
- return defer.maybeDeferred(target.finish)
-
- def download_to_data(self):
- data = self.u.data
- return defer.succeed(data)
container, like IDirectoryNode."""
class IImmutableFileNode(IFileNode):
- def download(target):
- """Download the file's contents to a given IDownloadTarget"""
-
- def download_to_data():
- """Download the file's contents. Return a Deferred that fires
- with those contents."""
-
def read(consumer, offset=0, size=None):
"""Download a portion (possibly all) of the file's contents, making
them available to the given IConsumer. Return a Deferred that fires
p.stopProducing(), which will result in an exception being delivered
via deferred.errback().
- A simple download-to-memory consumer example would look like this::
-
- class MemoryConsumer:
- implements(IConsumer)
- def __init__(self):
- self.chunks = []
- self.done = False
- def registerProducer(self, p, streaming):
- assert streaming == False
- while not self.done:
- p.resumeProducing()
- def write(self, data):
- self.chunks.append(data)
- def unregisterProducer(self):
- self.done = True
- d = filenode.read(MemoryConsumer())
- d.addCallback(lambda mc: "".join(mc.chunks))
- return d
-
+ See src/allmydata/util/consumer.py for an example of a simple
+ download-to-memory consumer.
"""
class IMutableFileNode(IFileNode):
import os, random, struct
from zope.interface import implements
from twisted.internet import defer
-from twisted.internet.interfaces import IConsumer
+from twisted.internet.interfaces import IPullProducer
from twisted.python import failure
from twisted.application import service
from twisted.web.error import Error as WebError
from allmydata.storage.mutable import MutableShareFile
from allmydata.util import hashutil, log, fileutil, pollmixin
from allmydata.util.assertutil import precondition
+from allmydata.util.consumer import download_to_data
from allmydata.stats import StatsGathererService
from allmydata.key_generator import KeyGeneratorService
import common_util as testutil
d.addCallback(_done)
return d
+class DummyProducer:
+ implements(IPullProducer)
+ def resumeProducing(self):
+ pass
+
class FakeCHKFileNode:
"""I provide IImmutableFileNode, but all of my data is stored in a
class-level dictionary."""
def is_readonly(self):
return True
- def download(self, target):
- if self.my_uri.to_string() not in self.all_contents:
- f = failure.Failure(NotEnoughSharesError(None, 0, 3))
- target.fail(f)
- return defer.fail(f)
- data = self.all_contents[self.my_uri.to_string()]
- target.open(len(data))
- target.write(data)
- target.close()
- return defer.maybeDeferred(target.finish)
- def download_to_data(self):
- if self.my_uri.to_string() not in self.all_contents:
- return defer.fail(NotEnoughSharesError(None, 0, 3))
- data = self.all_contents[self.my_uri.to_string()]
- return defer.succeed(data)
def get_size(self):
try:
data = self.all_contents[self.my_uri.to_string()]
except KeyError, le:
raise NotEnoughSharesError(le, 0, 3)
return len(data)
+
def read(self, consumer, offset=0, size=None):
- d = self.download_to_data()
- def _got(data):
- start = offset
- if size is not None:
- end = offset + size
- else:
- end = len(data)
- consumer.write(data[start:end])
- return consumer
- d.addCallback(_got)
+ # we don't bother to call registerProducer/unregisterProducer,
+ # because it's a hassle to write a dummy Producer that does the right
+ # thing (we have to make sure that DummyProducer.resumeProducing
+ # writes the data into the consumer immediately, otherwise it will
+ # loop forever).
+
+ d = defer.succeed(None)
+ d.addCallback(self._read, consumer, offset, size)
return d
+ def _read(self, ignored, consumer, offset, size):
+ if self.my_uri.to_string() not in self.all_contents:
+ raise NotEnoughSharesError(None, 0, 3)
+ data = self.all_contents[self.my_uri.to_string()]
+ start = offset
+ if size is not None:
+ end = offset + size
+ else:
+ end = len(data)
+ consumer.write(data[start:end])
+ return consumer
+
def make_chk_file_cap(size):
return uri.CHKFileURI(key=os.urandom(16),
uri_extension_hash=os.urandom(32),
d2 = cl0.upload(immutable.upload.Data(TEST_DATA, convergence=""))
def _after_upload(u):
filecap = u.uri
+ self.n = self.clients[1].create_node_from_uri(filecap)
self.uri = uri.CHKFileURI.init_from_string(filecap)
return cl0.create_node_from_uri(filecap)
d2.addCallback(_after_upload)
return sum_of_write_counts
def _download_and_check_plaintext(self, unused=None):
- d = self.clients[1].downloader.download_to_data(self.uri)
-
+ d = download_to_data(self.n)
def _after_download(result):
self.failUnlessEqual(result, TEST_DATA)
d.addCallback(_after_download)
print "First Error:", f.value.subFailure
return f
-class MemoryConsumer:
- implements(IConsumer)
- def __init__(self):
- self.chunks = []
- self.done = False
- def registerProducer(self, p, streaming):
- if streaming:
- # call resumeProducing once to start things off
- p.resumeProducing()
- else:
- while not self.done:
- p.resumeProducing()
- def write(self, data):
- self.chunks.append(data)
- def unregisterProducer(self):
- self.done = True
-
-def download_to_data(n, offset=0, size=None):
- d = n.read(MemoryConsumer(), offset, size)
- d.addCallback(lambda mc: "".join(mc.chunks))
- return d
-
def corrupt_field(data, offset, size, debug=False):
if random.random() < 0.5:
newdata = testutil.flip_one_bit(data, offset, size)
from allmydata import uri
from allmydata.storage.server import storage_index_to_dir
from allmydata.util import base32, fileutil
+from allmydata.util.consumer import download_to_data
from allmydata.immutable import upload
from allmydata.test.no_network import GridTestMixin
def download_immutable(self, ignored=None):
n = self.c0.create_node_from_uri(immutable_uri)
- d = n.download_to_data()
+ d = download_to_data(n)
def _got_data(data):
self.failUnlessEqual(data, plaintext)
d.addCallback(_got_data)
from zope.interface import implements
from twisted.trial import unittest
from twisted.internet import defer, reactor
-from twisted.internet.interfaces import IConsumer
from twisted.python.failure import Failure
from foolscap.api import fireEventually
from allmydata import hashtree, uri
from allmydata.immutable import encode, upload, download
from allmydata.util import hashutil
from allmydata.util.assertutil import _assert
+from allmydata.util.consumer import MemoryConsumer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
NotEnoughSharesError, IStorageBroker
from allmydata.monitor import Monitor
# 5 segments: 25, 25, 25, 25, 1
return self.do_encode(25, 101, 100, 5, 15, 8)
-class PausingTarget(download.Data):
- implements(IConsumer)
+class PausingConsumer(MemoryConsumer):
def __init__(self):
- download.Data.__init__(self)
+ MemoryConsumer.__init__(self)
self.size = 0
self.writes = 0
def write(self, data):
# last one (since then the _unpause timer will still be running)
self.producer.pauseProducing()
reactor.callLater(0.1, self._unpause)
- return download.Data.write(self, data)
+ return MemoryConsumer.write(self, data)
def _unpause(self):
self.producer.resumeProducing()
- def registerProducer(self, producer, streaming):
- self.producer = producer
- def unregisterProducer(self):
- self.producer = None
-class PausingAndStoppingTarget(PausingTarget):
+class PausingAndStoppingConsumer(PausingConsumer):
def write(self, data):
self.producer.pauseProducing()
reactor.callLater(0.5, self._stop)
def _stop(self):
self.producer.stopProducing()
-class StoppingTarget(PausingTarget):
+class StoppingConsumer(PausingConsumer):
def write(self, data):
self.producer.stopProducing()
max_segment_size=25,
bucket_modes={},
recover_mode="recover",
- target=None,
+ consumer=None,
):
if AVAILABLE_SHARES is None:
AVAILABLE_SHARES = k_and_happy_and_n[2]
max_segment_size, bucket_modes, data)
# that fires with (uri_extension_hash, e, shareholders)
d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode,
- target=target)
+ consumer=consumer)
# that fires with newdata
def _downloaded((newdata, fd)):
self.failUnless(newdata == data, str((len(newdata), len(data))))
return d
def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
- recover_mode, target=None):
+ recover_mode, consumer=None):
verifycap = res
if "corrupt_key" in recover_mode:
size=verifycap.size)
sb = FakeStorageBroker()
- if not target:
- target = download.Data()
- target = download.DecryptingTarget(target, u.key)
+ if not consumer:
+ consumer = MemoryConsumer()
+ innertarget = download.ConsumerAdapter(consumer)
+ target = download.DecryptingTarget(innertarget, u.key)
fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor())
# we manually cycle the CiphertextDownloader through a number of steps that
d.addCallback(fd._download_all_segments)
d.addCallback(fd._done)
- def _done(newdata):
+ def _done(t):
+ newdata = "".join(consumer.chunks)
return (newdata, fd)
d.addCallback(_done)
return d
return self.send_and_recover(datalen=101)
def test_pause(self):
- # use a DownloadTarget that does pauseProducing/resumeProducing a few
- # times, then finishes
- t = PausingTarget()
- d = self.send_and_recover(target=t)
+ # use a download target that does pauseProducing/resumeProducing a
+ # few times, then finishes
+ c = PausingConsumer()
+ d = self.send_and_recover(consumer=c)
return d
def test_pause_then_stop(self):
- # use a DownloadTarget that pauses, then stops.
- t = PausingAndStoppingTarget()
+ # use a download target that pauses, then stops.
+ c = PausingAndStoppingConsumer()
d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop",
"our Consumer called stopProducing()",
- self.send_and_recover, target=t)
+ self.send_and_recover, consumer=c)
return d
def test_stop(self):
- # use a DownloadTarget that does an immediate stop (ticket #473)
- t = StoppingTarget()
+ # use a download targetthat does an immediate stop (ticket #473)
+ c = StoppingConsumer()
d = self.shouldFail(download.DownloadStopped, "test_stop",
"our Consumer called stopProducing()",
- self.send_and_recover, target=t)
+ self.send_and_recover, consumer=c)
return d
# the following tests all use 4-out-of-10 encoding
from twisted.trial import unittest
from allmydata import uri, client
from allmydata.monitor import Monitor
-from allmydata.immutable import download
from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
from allmydata.mutable.filenode import MutableFileNode
from allmydata.util import hashutil, cachedir
-from allmydata.test.common import download_to_data
+from allmydata.util.consumer import download_to_data
class NotANode:
pass
self.failUnlessEqual(v, None)
self.failUnlessEqual(fn1.get_repair_cap(), None)
- d = fn1.download(download.Data())
+ d = download_to_data(fn1)
def _check(res):
self.failUnlessEqual(res, DATA)
d.addCallback(_check)
- d.addCallback(lambda res: fn1.download_to_data())
- d.addCallback(_check)
-
- d.addCallback(lambda res: download_to_data(fn1))
- d.addCallback(_check)
-
d.addCallback(lambda res: download_to_data(fn1, 1, 5))
def _check_segment(res):
self.failUnlessEqual(res, DATA[1:1+5])
from allmydata.test import common
from allmydata.interfaces import NotEnoughSharesError
+from allmydata.util.consumer import download_to_data
from twisted.internet import defer
from twisted.trial import unittest
import random
d.addCallback(_then_delete_8)
def _then_download(unused=None):
- d2 = self.clients[1].downloader.download_to_data(self.uri)
+ d2 = download_to_data(self.n)
def _after_download_callb(result):
self.fail() # should have gotten an errback instead
before_download_reads = self._count_reads()
def _attempt_to_download(unused=None):
- d2 = self.clients[1].downloader.download_to_data(self.uri)
+ d2 = download_to_data(self.n)
def _callb(res):
self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
before_download_reads = self._count_reads()
def _attempt_to_download(unused=None):
- d2 = self.clients[1].downloader.download_to_data(self.uri)
+ d2 = download_to_data(self.n)
def _callb(res):
self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
from twisted.internet import defer, reactor
from allmydata import uri, client
from allmydata.nodemaker import NodeMaker
-from allmydata.immutable import download
from allmydata.util import base32
from allmydata.util.idlib import shortnodeid_b2a
from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
d.addCallback(lambda res: n.overwrite("contents 2"))
d.addCallback(lambda res: n.download_best_version())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
- d.addCallback(lambda res: n.download(download.Data()))
- d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
d.addCallback(lambda smap: n.upload("contents 3", smap))
d.addCallback(lambda res: n.download_best_version())
d.addCallback(lambda res: n.overwrite("contents 2"))
d.addCallback(lambda res: n.download_best_version())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
- d.addCallback(lambda res: n.download(download.Data()))
- d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
d.addCallback(lambda smap: n.upload("contents 3", smap))
d.addCallback(lambda res: n.download_best_version())
from twisted.application import service
from allmydata.test.no_network import NoNetworkGrid
from allmydata.immutable.upload import Data
-
+from allmydata.util.consumer import download_to_data
class Harness(unittest.TestCase):
def setUp(self):
d = c0.upload(data)
def _uploaded(res):
n = c0.create_node_from_uri(res.uri)
- return n.download_to_data()
+ return download_to_data(n)
d.addCallback(_uploaded)
def _check(res):
self.failUnlessEqual(res, DATA)
from allmydata import check_results
from allmydata.interfaces import NotEnoughSharesError
from allmydata.immutable import repairer, upload
+from allmydata.util.consumer import download_to_data
from twisted.internet import defer
from twisted.trial import unittest
import random
d.addCallback(lambda ignored:
self.shouldFail(NotEnoughSharesError, "then_download",
None,
- self.c1_filenode.download_to_data))
+ download_to_data, self.c1_filenode))
d.addCallback(lambda ignored:
self.shouldFail(NotEnoughSharesError, "then_repair",
d.addCallback(lambda ignored:
self.delete_shares_numbered(self.uri, range(3, 10+1)))
- d.addCallback(lambda ignored: self.c1_filenode.download_to_data())
+ d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
d.addCallback(lambda newdata:
self.failUnlessEqual(newdata, common.TEST_DATA))
return d
d.addCallback(lambda ignored:
self.delete_shares_numbered(self.uri, range(3, 10+1)))
- d.addCallback(lambda ignored: self.c1_filenode.download_to_data())
+ d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
d.addCallback(lambda newdata:
self.failUnlessEqual(newdata, common.TEST_DATA))
return d
from base64 import b32encode
import os, sys, time, simplejson
from cStringIO import StringIO
-from zope.interface import implements
from twisted.trial import unittest
from twisted.internet import defer
from twisted.internet import threads # CLI tests use deferToThread
from twisted.internet.error import ConnectionDone, ConnectionLost
-from twisted.internet.interfaces import IConsumer, IPushProducer
import allmydata
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.server import si_a2b
-from allmydata.immutable import download, offloaded, upload
+from allmydata.immutable import offloaded, upload
from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
from allmydata.util import idlib, mathutil
from allmydata.util import log, base32
+from allmydata.util.consumer import MemoryConsumer, download_to_data
from allmydata.scripts import runner
from allmydata.interfaces import IDirectoryNode, IFileNode, \
NoSuchChildError, NoSharesError
from twisted.web.client import getPage
from twisted.web.error import Error
-from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
- download_to_data
+from allmydata.test.common import SystemTestMixin
LARGE_DATA = """
This is some data to publish to the virtual drive, which needs to be large
self.interrupt_after_d.callback(self)
return upload.Data.read(self, length)
-class GrabEverythingConsumer:
- implements(IConsumer)
-
- def __init__(self):
- self.contents = ""
-
- def registerProducer(self, producer, streaming):
- assert streaming
- assert IPushProducer.providedBy(producer)
-
- def write(self, data):
- self.contents += data
-
- def unregisterProducer(self):
- pass
-
class SystemTest(SystemTestMixin, unittest.TestCase):
timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
self.uri = theuri
assert isinstance(self.uri, str), self.uri
self.cap = uri.from_string(self.uri)
- self.downloader = self.clients[1].downloader
+ self.n = self.clients[1].create_node_from_uri(self.uri)
d.addCallback(_upload_done)
def _upload_again(res):
def _download_to_data(res):
log.msg("DOWNLOADING")
- return self.downloader.download_to_data(self.cap)
+ return download_to_data(self.n)
d.addCallback(_download_to_data)
def _download_to_data_done(data):
log.msg("download finished")
self.failUnlessEqual(data, DATA)
d.addCallback(_download_to_data_done)
- target_filename = os.path.join(self.basedir, "download.target")
- def _download_to_filename(res):
- return self.downloader.download_to_filename(self.cap,
- target_filename)
- d.addCallback(_download_to_filename)
- def _download_to_filename_done(res):
- newdata = open(target_filename, "rb").read()
- self.failUnlessEqual(newdata, DATA)
- d.addCallback(_download_to_filename_done)
-
- target_filename2 = os.path.join(self.basedir, "download.target2")
- def _download_to_filehandle(res):
- fh = open(target_filename2, "wb")
- return self.downloader.download_to_filehandle(self.cap, fh)
- d.addCallback(_download_to_filehandle)
- def _download_to_filehandle_done(fh):
- fh.close()
- newdata = open(target_filename2, "rb").read()
- self.failUnlessEqual(newdata, DATA)
- d.addCallback(_download_to_filehandle_done)
-
- consumer = GrabEverythingConsumer()
- ct = download.ConsumerAdapter(consumer)
- d.addCallback(lambda res:
- self.downloader.download(self.cap, ct))
- def _download_to_consumer_done(ign):
- self.failUnlessEqual(consumer.contents, DATA)
- d.addCallback(_download_to_consumer_done)
-
def _test_read(res):
n = self.clients[1].create_node_from_uri(self.uri)
d = download_to_data(n)
def _download_nonexistent_uri(res):
baduri = self.mangle_uri(self.uri)
+ badnode = self.clients[1].create_node_from_uri(baduri)
log.msg("about to download non-existent URI", level=log.UNUSUAL,
facility="tahoe.tests")
- d1 = self.downloader.download_to_data(uri.from_string(baduri))
+ d1 = download_to_data(badnode)
def _baduri_should_fail(res):
log.msg("finished downloading non-existend URI",
level=log.UNUSUAL, facility="tahoe.tests")
u = upload.Data(HELPER_DATA, convergence=convergence)
d = self.extra_node.upload(u)
def _uploaded(results):
- cap = uri.from_string(results.uri)
- return self.downloader.download_to_data(cap)
+ n = self.clients[1].create_node_from_uri(results.uri)
+ return download_to_data(n)
d.addCallback(_uploaded)
def _check(newdata):
self.failUnlessEqual(newdata, HELPER_DATA)
u.debug_stash_RemoteEncryptedUploadable = True
d = self.extra_node.upload(u)
def _uploaded(results):
- cap = uri.from_string(results.uri)
- return self.downloader.download_to_data(cap)
+ n = self.clients[1].create_node_from_uri(results.uri)
+ return download_to_data(n)
d.addCallback(_uploaded)
def _check(newdata):
self.failUnlessEqual(newdata, HELPER_DATA)
d.addCallback(lambda res: self.extra_node.upload(u2))
def _uploaded(results):
- cap = uri.from_string(results.uri)
+ cap = results.uri
log.msg("Second upload complete", level=log.NOISY,
facility="tahoe.test.test_system")
"resumption saved us some work even though we were using random keys:"
" read %d bytes out of %d total" %
(bytes_sent, len(DATA)))
- return self.downloader.download_to_data(cap)
+ n = self.clients[1].create_node_from_uri(cap)
+ return download_to_data(n)
d.addCallback(_uploaded)
def _check(newdata):
d.addCallback(self.log, "check_publish1 got /")
d.addCallback(lambda root: root.get(u"subdir1"))
d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
- d.addCallback(lambda filenode: filenode.download_to_data())
+ d.addCallback(lambda filenode: download_to_data(filenode))
d.addCallback(self.log, "get finished")
def _get_done(data):
self.failUnlessEqual(data, self.data)
d.addCallback(lambda dirnode:
self.failUnless(IDirectoryNode.providedBy(dirnode)))
d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
- d.addCallback(lambda filenode: filenode.download_to_data())
+ d.addCallback(lambda filenode: download_to_data(filenode))
d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
return self._private_node.get_child_at_path(path)
d.addCallback(lambda res: get_path(u"personal/sekrit data"))
- d.addCallback(lambda filenode: filenode.download_to_data())
+ d.addCallback(lambda filenode: download_to_data(filenode))
d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
d.addCallback(lambda res: get_path(u"s2-rw"))
d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
from allmydata.web import status, common
from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
from allmydata.util import fileutil, base32
+from allmydata.util.consumer import download_to_data
from allmydata.test.common import FakeCHKFileNode, FakeMutableFileNode, \
create_chk_filenode, WebErrorMixin, ShouldFailMixin, make_mutable_file_uri
from allmydata.interfaces import IMutableFileNode
def failUnlessChildContentsAre(self, node, name, expected_contents):
assert isinstance(name, unicode)
d = node.get_child_at_path(name)
- d.addCallback(lambda node: node.download_to_data())
+ d.addCallback(lambda node: download_to_data(node))
def _check(contents):
self.failUnlessEqual(contents, expected_contents)
d.addCallback(_check)
--- /dev/null
+
+"""This file defines a basic download-to-memory consumer, suitable for use in
+a filenode's read() method. See download_to_data() for an example of its use.
+"""
+
+from zope.interface import implements
+from twisted.internet.interfaces import IConsumer
+
+class MemoryConsumer:
+ implements(IConsumer)
+ def __init__(self):
+ self.chunks = []
+ self.done = False
+ def registerProducer(self, p, streaming):
+ self.producer = p
+ if streaming:
+ # call resumeProducing once to start things off
+ p.resumeProducing()
+ else:
+ while not self.done:
+ p.resumeProducing()
+ def write(self, data):
+ self.chunks.append(data)
+ def unregisterProducer(self):
+ self.done = True
+
+def download_to_data(n, offset=0, size=None):
+ d = n.read(MemoryConsumer(), offset, size)
+ d.addCallback(lambda mc: "".join(mc.chunks))
+ return d