From: Brian Warner Date: Mon, 18 May 2009 23:44:22 +0000 (-0700) Subject: immutable WriteBucketProxy: use pipeline to speed up uploads by overlapping roundtrip... X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/flags/README.win32?a=commitdiff_plain;h=79437baade3c67abe25a66612b9fcd98e845081a;p=tahoe-lafs%2Ftahoe-lafs.git immutable WriteBucketProxy: use pipeline to speed up uploads by overlapping roundtrips, for #392 --- diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py index 68555624..6ca53391 100644 --- a/src/allmydata/immutable/layout.py +++ b/src/allmydata/immutable/layout.py @@ -3,7 +3,7 @@ from zope.interface import implements from twisted.internet import defer from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ FileTooLargeError, HASH_SIZE -from allmydata.util import mathutil, idlib, observer +from allmydata.util import mathutil, idlib, observer, pipeline from allmydata.util.assertutil import precondition from allmydata.storage.server import si_b2a @@ -93,7 +93,8 @@ class WriteBucketProxy: fieldstruct = ">L" def __init__(self, rref, data_size, block_size, num_segments, - num_share_hashes, uri_extension_size_max, nodeid): + num_share_hashes, uri_extension_size_max, nodeid, + pipeline_size=50000): self._rref = rref self._data_size = data_size self._block_size = block_size @@ -110,6 +111,12 @@ class WriteBucketProxy: self._create_offsets(block_size, data_size) + # k=3, max_segment_size=128KiB gives us a typical segment of 43691 + # bytes. Setting the default pipeline_size to 50KB lets us get two + # segments onto the wire but not a third, which would keep the pipe + # filled. + self._pipeline = pipeline.Pipeline(pipeline_size) + def get_allocated_size(self): return (self._offsets['uri_extension'] + self.fieldsize + self._uri_extension_size_max) @@ -218,11 +225,19 @@ class WriteBucketProxy: return self._write(offset, length+data) def _write(self, offset, data): - # TODO: for small shares, buffer the writes and do just a single call - return self._rref.callRemote("write", offset, data) + # use a Pipeline to pipeline several writes together. TODO: another + # speedup would be to coalesce small writes into a single call: this + # would reduce the foolscap CPU overhead per share, but wouldn't + # reduce the number of round trips, so it might not be worth the + # effort. + + return self._pipeline.add(len(data), + self._rref.callRemote, "write", offset, data) def close(self): - return self._rref.callRemote("close") + d = self._pipeline.add(0, self._rref.callRemote, "close") + d.addCallback(lambda ign: self._pipeline.flush()) + return d def abort(self): return self._rref.callRemoteOnly("abort")