From 79437baade3c67abe25a66612b9fcd98e845081a Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Mon, 18 May 2009 16:44:22 -0700
Subject: [PATCH] immutable WriteBucketProxy: use pipeline to speed up uploads
 by overlapping roundtrips, for #392

---
 src/allmydata/immutable/layout.py | 25 ++++++++++++++++++++-----
 1 file changed, 20 insertions(+), 5 deletions(-)

diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py
index 68555624..6ca53391 100644
--- a/src/allmydata/immutable/layout.py
+++ b/src/allmydata/immutable/layout.py
@@ -3,7 +3,7 @@ from zope.interface import implements
 from twisted.internet import defer
 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
      FileTooLargeError, HASH_SIZE
-from allmydata.util import mathutil, idlib, observer
+from allmydata.util import mathutil, idlib, observer, pipeline
 from allmydata.util.assertutil import precondition
 from allmydata.storage.server import si_b2a
 
@@ -93,7 +93,8 @@ class WriteBucketProxy:
     fieldstruct = ">L"
 
     def __init__(self, rref, data_size, block_size, num_segments,
-                 num_share_hashes, uri_extension_size_max, nodeid):
+                 num_share_hashes, uri_extension_size_max, nodeid,
+                 pipeline_size=50000):
         self._rref = rref
         self._data_size = data_size
         self._block_size = block_size
@@ -110,6 +111,12 @@ class WriteBucketProxy:
 
         self._create_offsets(block_size, data_size)
 
+        # k=3, max_segment_size=128KiB gives us a typical segment of 43691
+        # bytes. Setting the default pipeline_size to 50KB lets us get two
+        # segments onto the wire but not a third, which would keep the pipe
+        # filled.
+        self._pipeline = pipeline.Pipeline(pipeline_size)
+
     def get_allocated_size(self):
         return (self._offsets['uri_extension'] + self.fieldsize +
                 self._uri_extension_size_max)
@@ -218,11 +225,19 @@ class WriteBucketProxy:
         return self._write(offset, length+data)
 
     def _write(self, offset, data):
-        # TODO: for small shares, buffer the writes and do just a single call
-        return self._rref.callRemote("write", offset, data)
+        # use a Pipeline to pipeline several writes together. TODO: another
+        # speedup would be to coalesce small writes into a single call: this
+        # would reduce the foolscap CPU overhead per share, but wouldn't
+        # reduce the number of round trips, so it might not be worth the
+        # effort.
+
+        return self._pipeline.add(len(data),
+                                  self._rref.callRemote, "write", offset, data)
 
     def close(self):
-        return self._rref.callRemote("close")
+        d = self._pipeline.add(0, self._rref.callRemote, "close")
+        d.addCallback(lambda ign: self._pipeline.flush())
+        return d
 
     def abort(self):
         return self._rref.callRemoteOnly("abort")
-- 
2.45.2