from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE
-from allmydata.util import mathutil, idlib, observer
+from allmydata.util import mathutil, idlib, observer, pipeline
from allmydata.util.assertutil import precondition
from allmydata.storage.server import si_b2a
fieldstruct = ">L"
def __init__(self, rref, data_size, block_size, num_segments,
- num_share_hashes, uri_extension_size_max, nodeid):
+ num_share_hashes, uri_extension_size_max, nodeid,
+ pipeline_size=50000):
self._rref = rref
self._data_size = data_size
self._block_size = block_size
self._create_offsets(block_size, data_size)
+ # k=3, max_segment_size=128KiB gives us a typical segment of 43691
+ # bytes. Setting the default pipeline_size to 50KB lets us get two
+ # segments onto the wire but not a third, which would keep the pipe
+ # filled.
+ self._pipeline = pipeline.Pipeline(pipeline_size)
+
def get_allocated_size(self):
return (self._offsets['uri_extension'] + self.fieldsize +
self._uri_extension_size_max)
return self._write(offset, length+data)
def _write(self, offset, data):
- # TODO: for small shares, buffer the writes and do just a single call
- return self._rref.callRemote("write", offset, data)
+ # use a Pipeline to pipeline several writes together. TODO: another
+ # speedup would be to coalesce small writes into a single call: this
+ # would reduce the foolscap CPU overhead per share, but wouldn't
+ # reduce the number of round trips, so it might not be worth the
+ # effort.
+
+ return self._pipeline.add(len(data),
+ self._rref.callRemote, "write", offset, data)
def close(self):
- return self._rref.callRemote("close")
+ d = self._pipeline.add(0, self._rref.callRemote, "close")
+ d.addCallback(lambda ign: self._pipeline.flush())
+ return d
def abort(self):
return self._rref.callRemoteOnly("abort")