From e76c6b606f4c9fb89fc2a161a1955fdc347cca7b Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Mon, 18 May 2009 16:43:26 -0700
Subject: [PATCH] util/pipeline.py: new utility class to manage size-limited
 work pipelines, for #392

---
 src/allmydata/test/test_util.py | 174 +++++++++++++++++++++++++++++++-
 src/allmydata/util/pipeline.py  | 132 ++++++++++++++++++++++++
 2 files changed, 305 insertions(+), 1 deletion(-)
 create mode 100644 src/allmydata/util/pipeline.py

diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py
index 4a97001f..16a63f04 100644
--- a/src/allmydata/test/test_util.py
+++ b/src/allmydata/test/test_util.py
@@ -6,11 +6,12 @@ from StringIO import StringIO
 from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from twisted.python.failure import Failure
+from twisted.python import log
 
 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
 from allmydata.util import limiter, time_format, pollmixin, cachedir
-from allmydata.util import statistics, dictutil, rrefutil
+from allmydata.util import statistics, dictutil, rrefutil, pipeline
 from allmydata.util.rrefutil import ServerFailure
 
 class Base32(unittest.TestCase):
@@ -1300,3 +1301,174 @@ class RemoteFailures(unittest.TestCase):
                                   rrefutil.trap_local, f, IndexError)
         d.addErrback(_check)
         return d
+
+class Pipeline(unittest.TestCase):
+    def pause(self, *args, **kwargs):
+        d = defer.Deferred()
+        self.calls.append( (d, args, kwargs) )
+        return d
+
+    def failUnlessCallsAre(self, expected):
+        #print self.calls
+        #print expected
+        self.failUnlessEqual(len(self.calls), len(expected), self.calls)
+        for i,c in enumerate(self.calls):
+            self.failUnlessEqual(c[1:], expected[i], str(i))
+
+    def test_basic(self):
+        self.calls = []
+        finished = []
+        p = pipeline.Pipeline(100)
+
+        d = p.flush() # fires immediately
+        d.addCallbacks(finished.append, log.err)
+        self.failUnlessEqual(len(finished), 1)
+        finished = []
+
+        d = p.add(10, self.pause, "one")
+        # the call should start right away, and our return Deferred should
+        # fire right away
+        d.addCallbacks(finished.append, log.err)
+        self.failUnlessEqual(len(finished), 1)
+        self.failUnlessEqual(finished[0], None)
+        self.failUnlessCallsAre([ ( ("one",) , {} ) ])
+        self.failUnlessEqual(p.gauge, 10)
+
+        # pipeline: [one]
+
+        finished = []
+        d = p.add(20, self.pause, "two", kw=2)
+        # pipeline: [one, two]
+
+        # the call and the Deferred should fire right away
+        d.addCallbacks(finished.append, log.err)
+        self.failUnlessEqual(len(finished), 1)
+        self.failUnlessEqual(finished[0], None)
+        self.failUnlessCallsAre([ ( ("one",) , {} ),
+                                  ( ("two",) , {"kw": 2} ),
+                                  ])
+        self.failUnlessEqual(p.gauge, 30)
+
+        self.calls[0][0].callback("one-result")
+        # pipeline: [two]
+        self.failUnlessEqual(p.gauge, 20)
+
+        finished = []
+        d = p.add(90, self.pause, "three", "posarg1")
+        # pipeline: [two, three]
+        flushed = []
+        fd = p.flush()
+        fd.addCallbacks(flushed.append, log.err)
+        self.failUnlessEqual(flushed, [])
+
+        # the call will be made right away, but the return Deferred will not,
+        # because the pipeline is now full.
+        d.addCallbacks(finished.append, log.err)
+        self.failUnlessEqual(len(finished), 0)
+        self.failUnlessCallsAre([ ( ("one",) , {} ),
+                                  ( ("two",) , {"kw": 2} ),
+                                  ( ("three", "posarg1"), {} ),
+                                  ])
+        self.failUnlessEqual(p.gauge, 110)
+
+        self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause)
+
+        # retiring either call will unblock the pipeline, causing the #3
+        # Deferred to fire
+        self.calls[2][0].callback("three-result")
+        # pipeline: [two]
+
+        self.failUnlessEqual(len(finished), 1)
+        self.failUnlessEqual(finished[0], None)
+        self.failUnlessEqual(flushed, [])
+
+        # retiring call#2 will finally allow the flush() Deferred to fire
+        self.calls[1][0].callback("two-result")
+        self.failUnlessEqual(len(flushed), 1)
+
+    def test_errors(self):
+        self.calls = []
+        p = pipeline.Pipeline(100)
+
+        d1 = p.add(200, self.pause, "one")
+        d2 = p.flush()
+
+        finished = []
+        d1.addBoth(finished.append)
+        self.failUnlessEqual(finished, [])
+
+        flushed = []
+        d2.addBoth(flushed.append)
+        self.failUnlessEqual(flushed, [])
+
+        self.calls[0][0].errback(ValueError("oops"))
+
+        self.failUnlessEqual(len(finished), 1)
+        f = finished[0]
+        self.failUnless(isinstance(f, Failure))
+        self.failUnless(f.check(pipeline.PipelineError))
+        r = repr(f.value)
+        self.failUnless("ValueError" in r, r)
+        f2 = f.value.error
+        self.failUnless(f2.check(ValueError))
+
+        self.failUnlessEqual(len(flushed), 1)
+        f = flushed[0]
+        self.failUnless(isinstance(f, Failure))
+        self.failUnless(f.check(pipeline.PipelineError))
+        f2 = f.value.error
+        self.failUnless(f2.check(ValueError))
+
+        # now that the pipeline is in the failed state, any new calls will
+        # fail immediately
+
+        d3 = p.add(20, self.pause, "two")
+
+        finished = []
+        d3.addBoth(finished.append)
+        self.failUnlessEqual(len(finished), 1)
+        f = finished[0]
+        self.failUnless(isinstance(f, Failure))
+        self.failUnless(f.check(pipeline.PipelineError))
+        r = repr(f.value)
+        self.failUnless("ValueError" in r, r)
+        f2 = f.value.error
+        self.failUnless(f2.check(ValueError))
+
+        d4 = p.flush()
+        flushed = []
+        d4.addBoth(flushed.append)
+        self.failUnlessEqual(len(flushed), 1)
+        f = flushed[0]
+        self.failUnless(isinstance(f, Failure))
+        self.failUnless(f.check(pipeline.PipelineError))
+        f2 = f.value.error
+        self.failUnless(f2.check(ValueError))
+        
+
+    def test_errors2(self):
+        self.calls = []
+        p = pipeline.Pipeline(100)
+
+        d1 = p.add(10, self.pause, "one")
+        d2 = p.add(20, self.pause, "two")
+        d3 = p.add(30, self.pause, "three")
+        d4 = p.flush()
+
+        # one call fails, then the second one succeeds: make sure
+        # ExpandableDeferredList tolerates the second one
+
+        flushed = []
+        d4.addBoth(flushed.append)
+        self.failUnlessEqual(flushed, [])
+
+        self.calls[0][0].errback(ValueError("oops"))
+        self.failUnlessEqual(len(flushed), 1)
+        f = flushed[0]
+        self.failUnless(isinstance(f, Failure))
+        self.failUnless(f.check(pipeline.PipelineError))
+        f2 = f.value.error
+        self.failUnless(f2.check(ValueError))
+
+        self.calls[1][0].callback("two-result")
+        self.calls[2][0].errback(ValueError("three-error"))
diff --git a/src/allmydata/util/pipeline.py b/src/allmydata/util/pipeline.py
new file mode 100644
index 00000000..5f3b031c
--- /dev/null
+++ b/src/allmydata/util/pipeline.py
@@ -0,0 +1,132 @@
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+from twisted.python import log
+from allmydata.util.assertutil import precondition
+
+class PipelineError(Exception):
+    """One of the pipelined messages returned an error. The received Failure
+    object is stored in my .error attribute."""
+    def __init__(self, error):
+        self.error = error
+
+    def __repr__(self):
+        return "<PipelineError error=(%r)>" % self.error
+
+class SingleFileError(Exception):
+    """You are not permitted to add a job to a full pipeline."""
+
+
+class ExpandableDeferredList(defer.Deferred):
+    # like DeferredList(fireOnOneErrback=True) with a built-in
+    # gatherResults(), but you can add new Deferreds until you close it. This
+    # gives you a chance to add don't-complain-about-unhandled-error errbacks
+    # immediately after attachment, regardless of whether you actually end up
+    # wanting the list or not.
+    def __init__(self):
+        defer.Deferred.__init__(self)
+        self.resultsReceived = 0
+        self.resultList = []
+        self.failure = None
+        self.closed = False
+
+    def addDeferred(self, d):
+        precondition(not self.closed, "don't call addDeferred() on a closed ExpandableDeferredList")
+        index = len(self.resultList)
+        self.resultList.append(None)
+        d.addCallbacks(self._cbDeferred, self._ebDeferred,
+                       callbackArgs=(index,))
+        return d
+
+    def close(self):
+        self.closed = True
+        self.checkForFinished()
+
+    def checkForFinished(self):
+        if not self.closed:
+            return
+        if self.called:
+            return
+        if self.failure:
+            self.errback(self.failure)
+        elif self.resultsReceived == len(self.resultList):
+            self.callback(self.resultList)
+
+    def _cbDeferred(self, res, index):
+        self.resultList[index] = res
+        self.resultsReceived += 1
+        self.checkForFinished()
+        return res
+
+    def _ebDeferred(self, f):
+        self.failure = f
+        self.checkForFinished()
+        return f
+
+
+class Pipeline:
+    """I manage a size-limited pipeline of Deferred operations, usually
+    callRemote() messages."""
+
+    def __init__(self, capacity):
+        self.capacity = capacity # how full we can be
+        self.gauge = 0 # how full we are
+        self.failure = None
+        self.waiting = [] # callers of add() who are blocked
+        self.unflushed = ExpandableDeferredList()
+
+    def add(self, _size, _func, *args, **kwargs):
+        # We promise that all the Deferreds we return will fire in the order
+        # they were returned. To make it easier to keep this promise, we
+        # prohibit multiple outstanding calls to add() .
+        if self.waiting:
+            raise SingleFileError
+        if self.failure:
+            return defer.fail(self.failure)
+        self.gauge += _size
+        fd = defer.maybeDeferred(_func, *args, **kwargs)
+        fd.addBoth(self._call_finished, _size)
+        self.unflushed.addDeferred(fd)
+        fd.addErrback(self._eat_pipeline_errors)
+        fd.addErrback(log.err, "_eat_pipeline_errors didn't eat it")
+        if self.gauge < self.capacity:
+            return defer.succeed(None)
+        d = defer.Deferred()
+        self.waiting.append(d)
+        return d
+
+    def flush(self):
+        if self.failure:
+            return defer.fail(self.failure)
+        d, self.unflushed = self.unflushed, ExpandableDeferredList()
+        d.close()
+        d.addErrback(self._flushed_error)
+        return d
+
+    def _flushed_error(self, f):
+        precondition(self.failure) # should have been set by _call_finished
+        return self.failure
+
+    def _call_finished(self, res, size):
+        self.gauge -= size
+        if isinstance(res, Failure):
+            res = Failure(PipelineError(res))
+            if not self.failure:
+                self.failure = res
+        if self.failure:
+            while self.waiting:
+                d = self.waiting.pop(0)
+                d.errback(self.failure)
+        else:
+            while self.waiting and (self.gauge < self.capacity):
+                d = self.waiting.pop(0)
+                d.callback(None)
+                # the d.callback() might trigger a new call to add(), which
+                # will raise our gauge and might cause the pipeline to be
+                # filled. So the while() loop gets a chance to tell the
+                # caller to stop.
+        return res
+
+    def _eat_pipeline_errors(self, f):
+        f.trap(PipelineError)
+        return None
-- 
2.45.2