from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
+from twisted.python import log
from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
from allmydata.util import limiter, time_format, pollmixin, cachedir
-from allmydata.util import statistics, dictutil, rrefutil
+from allmydata.util import statistics, dictutil, rrefutil, pipeline
from allmydata.util.rrefutil import ServerFailure
class Base32(unittest.TestCase):
rrefutil.trap_local, f, IndexError)
d.addErrback(_check)
return d
+
+class Pipeline(unittest.TestCase):
+ def pause(self, *args, **kwargs):
+ d = defer.Deferred()
+ self.calls.append( (d, args, kwargs) )
+ return d
+
+ def failUnlessCallsAre(self, expected):
+ #print self.calls
+ #print expected
+ self.failUnlessEqual(len(self.calls), len(expected), self.calls)
+ for i,c in enumerate(self.calls):
+ self.failUnlessEqual(c[1:], expected[i], str(i))
+
+ def test_basic(self):
+ self.calls = []
+ finished = []
+ p = pipeline.Pipeline(100)
+
+ d = p.flush() # fires immediately
+ d.addCallbacks(finished.append, log.err)
+ self.failUnlessEqual(len(finished), 1)
+ finished = []
+
+ d = p.add(10, self.pause, "one")
+ # the call should start right away, and our return Deferred should
+ # fire right away
+ d.addCallbacks(finished.append, log.err)
+ self.failUnlessEqual(len(finished), 1)
+ self.failUnlessEqual(finished[0], None)
+ self.failUnlessCallsAre([ ( ("one",) , {} ) ])
+ self.failUnlessEqual(p.gauge, 10)
+
+ # pipeline: [one]
+
+ finished = []
+ d = p.add(20, self.pause, "two", kw=2)
+ # pipeline: [one, two]
+
+ # the call and the Deferred should fire right away
+ d.addCallbacks(finished.append, log.err)
+ self.failUnlessEqual(len(finished), 1)
+ self.failUnlessEqual(finished[0], None)
+ self.failUnlessCallsAre([ ( ("one",) , {} ),
+ ( ("two",) , {"kw": 2} ),
+ ])
+ self.failUnlessEqual(p.gauge, 30)
+
+ self.calls[0][0].callback("one-result")
+ # pipeline: [two]
+ self.failUnlessEqual(p.gauge, 20)
+
+ finished = []
+ d = p.add(90, self.pause, "three", "posarg1")
+ # pipeline: [two, three]
+ flushed = []
+ fd = p.flush()
+ fd.addCallbacks(flushed.append, log.err)
+ self.failUnlessEqual(flushed, [])
+
+ # the call will be made right away, but the return Deferred will not,
+ # because the pipeline is now full.
+ d.addCallbacks(finished.append, log.err)
+ self.failUnlessEqual(len(finished), 0)
+ self.failUnlessCallsAre([ ( ("one",) , {} ),
+ ( ("two",) , {"kw": 2} ),
+ ( ("three", "posarg1"), {} ),
+ ])
+ self.failUnlessEqual(p.gauge, 110)
+
+ self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause)
+
+ # retiring either call will unblock the pipeline, causing the #3
+ # Deferred to fire
+ self.calls[2][0].callback("three-result")
+ # pipeline: [two]
+
+ self.failUnlessEqual(len(finished), 1)
+ self.failUnlessEqual(finished[0], None)
+ self.failUnlessEqual(flushed, [])
+
+ # retiring call#2 will finally allow the flush() Deferred to fire
+ self.calls[1][0].callback("two-result")
+ self.failUnlessEqual(len(flushed), 1)
+
+ def test_errors(self):
+ self.calls = []
+ p = pipeline.Pipeline(100)
+
+ d1 = p.add(200, self.pause, "one")
+ d2 = p.flush()
+
+ finished = []
+ d1.addBoth(finished.append)
+ self.failUnlessEqual(finished, [])
+
+ flushed = []
+ d2.addBoth(flushed.append)
+ self.failUnlessEqual(flushed, [])
+
+ self.calls[0][0].errback(ValueError("oops"))
+
+ self.failUnlessEqual(len(finished), 1)
+ f = finished[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(pipeline.PipelineError))
+ r = repr(f.value)
+ self.failUnless("ValueError" in r, r)
+ f2 = f.value.error
+ self.failUnless(f2.check(ValueError))
+
+ self.failUnlessEqual(len(flushed), 1)
+ f = flushed[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(pipeline.PipelineError))
+ f2 = f.value.error
+ self.failUnless(f2.check(ValueError))
+
+ # now that the pipeline is in the failed state, any new calls will
+ # fail immediately
+
+ d3 = p.add(20, self.pause, "two")
+
+ finished = []
+ d3.addBoth(finished.append)
+ self.failUnlessEqual(len(finished), 1)
+ f = finished[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(pipeline.PipelineError))
+ r = repr(f.value)
+ self.failUnless("ValueError" in r, r)
+ f2 = f.value.error
+ self.failUnless(f2.check(ValueError))
+
+ d4 = p.flush()
+ flushed = []
+ d4.addBoth(flushed.append)
+ self.failUnlessEqual(len(flushed), 1)
+ f = flushed[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(pipeline.PipelineError))
+ f2 = f.value.error
+ self.failUnless(f2.check(ValueError))
+
+
+ def test_errors2(self):
+ self.calls = []
+ p = pipeline.Pipeline(100)
+
+ d1 = p.add(10, self.pause, "one")
+ d2 = p.add(20, self.pause, "two")
+ d3 = p.add(30, self.pause, "three")
+ d4 = p.flush()
+
+ # one call fails, then the second one succeeds: make sure
+ # ExpandableDeferredList tolerates the second one
+
+ flushed = []
+ d4.addBoth(flushed.append)
+ self.failUnlessEqual(flushed, [])
+
+ self.calls[0][0].errback(ValueError("oops"))
+ self.failUnlessEqual(len(flushed), 1)
+ f = flushed[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(pipeline.PipelineError))
+ f2 = f.value.error
+ self.failUnless(f2.check(ValueError))
+
+ self.calls[1][0].callback("two-result")
+ self.calls[2][0].errback(ValueError("three-error"))
--- /dev/null
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+from twisted.python import log
+from allmydata.util.assertutil import precondition
+
+class PipelineError(Exception):
+ """One of the pipelined messages returned an error. The received Failure
+ object is stored in my .error attribute."""
+ def __init__(self, error):
+ self.error = error
+
+ def __repr__(self):
+ return "<PipelineError error=(%r)>" % self.error
+
+class SingleFileError(Exception):
+ """You are not permitted to add a job to a full pipeline."""
+
+
+class ExpandableDeferredList(defer.Deferred):
+ # like DeferredList(fireOnOneErrback=True) with a built-in
+ # gatherResults(), but you can add new Deferreds until you close it. This
+ # gives you a chance to add don't-complain-about-unhandled-error errbacks
+ # immediately after attachment, regardless of whether you actually end up
+ # wanting the list or not.
+ def __init__(self):
+ defer.Deferred.__init__(self)
+ self.resultsReceived = 0
+ self.resultList = []
+ self.failure = None
+ self.closed = False
+
+ def addDeferred(self, d):
+ precondition(not self.closed, "don't call addDeferred() on a closed ExpandableDeferredList")
+ index = len(self.resultList)
+ self.resultList.append(None)
+ d.addCallbacks(self._cbDeferred, self._ebDeferred,
+ callbackArgs=(index,))
+ return d
+
+ def close(self):
+ self.closed = True
+ self.checkForFinished()
+
+ def checkForFinished(self):
+ if not self.closed:
+ return
+ if self.called:
+ return
+ if self.failure:
+ self.errback(self.failure)
+ elif self.resultsReceived == len(self.resultList):
+ self.callback(self.resultList)
+
+ def _cbDeferred(self, res, index):
+ self.resultList[index] = res
+ self.resultsReceived += 1
+ self.checkForFinished()
+ return res
+
+ def _ebDeferred(self, f):
+ self.failure = f
+ self.checkForFinished()
+ return f
+
+
+class Pipeline:
+ """I manage a size-limited pipeline of Deferred operations, usually
+ callRemote() messages."""
+
+ def __init__(self, capacity):
+ self.capacity = capacity # how full we can be
+ self.gauge = 0 # how full we are
+ self.failure = None
+ self.waiting = [] # callers of add() who are blocked
+ self.unflushed = ExpandableDeferredList()
+
+ def add(self, _size, _func, *args, **kwargs):
+ # We promise that all the Deferreds we return will fire in the order
+ # they were returned. To make it easier to keep this promise, we
+ # prohibit multiple outstanding calls to add() .
+ if self.waiting:
+ raise SingleFileError
+ if self.failure:
+ return defer.fail(self.failure)
+ self.gauge += _size
+ fd = defer.maybeDeferred(_func, *args, **kwargs)
+ fd.addBoth(self._call_finished, _size)
+ self.unflushed.addDeferred(fd)
+ fd.addErrback(self._eat_pipeline_errors)
+ fd.addErrback(log.err, "_eat_pipeline_errors didn't eat it")
+ if self.gauge < self.capacity:
+ return defer.succeed(None)
+ d = defer.Deferred()
+ self.waiting.append(d)
+ return d
+
+ def flush(self):
+ if self.failure:
+ return defer.fail(self.failure)
+ d, self.unflushed = self.unflushed, ExpandableDeferredList()
+ d.close()
+ d.addErrback(self._flushed_error)
+ return d
+
+ def _flushed_error(self, f):
+ precondition(self.failure) # should have been set by _call_finished
+ return self.failure
+
+ def _call_finished(self, res, size):
+ self.gauge -= size
+ if isinstance(res, Failure):
+ res = Failure(PipelineError(res))
+ if not self.failure:
+ self.failure = res
+ if self.failure:
+ while self.waiting:
+ d = self.waiting.pop(0)
+ d.errback(self.failure)
+ else:
+ while self.waiting and (self.gauge < self.capacity):
+ d = self.waiting.pop(0)
+ d.callback(None)
+ # the d.callback() might trigger a new call to add(), which
+ # will raise our gauge and might cause the pipeline to be
+ # filled. So the while() loop gets a chance to tell the
+ # caller to stop.
+ return res
+
+ def _eat_pipeline_errors(self, f):
+ f.trap(PipelineError)
+ return None