From e76c6b606f4c9fb89fc2a161a1955fdc347cca7b Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 18 May 2009 16:43:26 -0700 Subject: [PATCH] util/pipeline.py: new utility class to manage size-limited work pipelines, for #392 --- src/allmydata/test/test_util.py | 174 +++++++++++++++++++++++++++++++- src/allmydata/util/pipeline.py | 132 ++++++++++++++++++++++++ 2 files changed, 305 insertions(+), 1 deletion(-) create mode 100644 src/allmydata/util/pipeline.py diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 4a97001f..16a63f04 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -6,11 +6,12 @@ from StringIO import StringIO from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.python.failure import Failure +from twisted.python import log from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil from allmydata.util import assertutil, fileutil, deferredutil, abbreviate from allmydata.util import limiter, time_format, pollmixin, cachedir -from allmydata.util import statistics, dictutil, rrefutil +from allmydata.util import statistics, dictutil, rrefutil, pipeline from allmydata.util.rrefutil import ServerFailure class Base32(unittest.TestCase): @@ -1300,3 +1301,174 @@ class RemoteFailures(unittest.TestCase): rrefutil.trap_local, f, IndexError) d.addErrback(_check) return d + +class Pipeline(unittest.TestCase): + def pause(self, *args, **kwargs): + d = defer.Deferred() + self.calls.append( (d, args, kwargs) ) + return d + + def failUnlessCallsAre(self, expected): + #print self.calls + #print expected + self.failUnlessEqual(len(self.calls), len(expected), self.calls) + for i,c in enumerate(self.calls): + self.failUnlessEqual(c[1:], expected[i], str(i)) + + def test_basic(self): + self.calls = [] + finished = [] + p = pipeline.Pipeline(100) + + d = p.flush() # fires immediately + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 1) + finished = [] + + d = p.add(10, self.pause, "one") + # the call should start right away, and our return Deferred should + # fire right away + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 1) + self.failUnlessEqual(finished[0], None) + self.failUnlessCallsAre([ ( ("one",) , {} ) ]) + self.failUnlessEqual(p.gauge, 10) + + # pipeline: [one] + + finished = [] + d = p.add(20, self.pause, "two", kw=2) + # pipeline: [one, two] + + # the call and the Deferred should fire right away + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 1) + self.failUnlessEqual(finished[0], None) + self.failUnlessCallsAre([ ( ("one",) , {} ), + ( ("two",) , {"kw": 2} ), + ]) + self.failUnlessEqual(p.gauge, 30) + + self.calls[0][0].callback("one-result") + # pipeline: [two] + self.failUnlessEqual(p.gauge, 20) + + finished = [] + d = p.add(90, self.pause, "three", "posarg1") + # pipeline: [two, three] + flushed = [] + fd = p.flush() + fd.addCallbacks(flushed.append, log.err) + self.failUnlessEqual(flushed, []) + + # the call will be made right away, but the return Deferred will not, + # because the pipeline is now full. + d.addCallbacks(finished.append, log.err) + self.failUnlessEqual(len(finished), 0) + self.failUnlessCallsAre([ ( ("one",) , {} ), + ( ("two",) , {"kw": 2} ), + ( ("three", "posarg1"), {} ), + ]) + self.failUnlessEqual(p.gauge, 110) + + self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause) + + # retiring either call will unblock the pipeline, causing the #3 + # Deferred to fire + self.calls[2][0].callback("three-result") + # pipeline: [two] + + self.failUnlessEqual(len(finished), 1) + self.failUnlessEqual(finished[0], None) + self.failUnlessEqual(flushed, []) + + # retiring call#2 will finally allow the flush() Deferred to fire + self.calls[1][0].callback("two-result") + self.failUnlessEqual(len(flushed), 1) + + def test_errors(self): + self.calls = [] + p = pipeline.Pipeline(100) + + d1 = p.add(200, self.pause, "one") + d2 = p.flush() + + finished = [] + d1.addBoth(finished.append) + self.failUnlessEqual(finished, []) + + flushed = [] + d2.addBoth(flushed.append) + self.failUnlessEqual(flushed, []) + + self.calls[0][0].errback(ValueError("oops")) + + self.failUnlessEqual(len(finished), 1) + f = finished[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + r = repr(f.value) + self.failUnless("ValueError" in r, r) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + self.failUnlessEqual(len(flushed), 1) + f = flushed[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + # now that the pipeline is in the failed state, any new calls will + # fail immediately + + d3 = p.add(20, self.pause, "two") + + finished = [] + d3.addBoth(finished.append) + self.failUnlessEqual(len(finished), 1) + f = finished[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + r = repr(f.value) + self.failUnless("ValueError" in r, r) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + d4 = p.flush() + flushed = [] + d4.addBoth(flushed.append) + self.failUnlessEqual(len(flushed), 1) + f = flushed[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + + def test_errors2(self): + self.calls = [] + p = pipeline.Pipeline(100) + + d1 = p.add(10, self.pause, "one") + d2 = p.add(20, self.pause, "two") + d3 = p.add(30, self.pause, "three") + d4 = p.flush() + + # one call fails, then the second one succeeds: make sure + # ExpandableDeferredList tolerates the second one + + flushed = [] + d4.addBoth(flushed.append) + self.failUnlessEqual(flushed, []) + + self.calls[0][0].errback(ValueError("oops")) + self.failUnlessEqual(len(flushed), 1) + f = flushed[0] + self.failUnless(isinstance(f, Failure)) + self.failUnless(f.check(pipeline.PipelineError)) + f2 = f.value.error + self.failUnless(f2.check(ValueError)) + + self.calls[1][0].callback("two-result") + self.calls[2][0].errback(ValueError("three-error")) diff --git a/src/allmydata/util/pipeline.py b/src/allmydata/util/pipeline.py new file mode 100644 index 00000000..5f3b031c --- /dev/null +++ b/src/allmydata/util/pipeline.py @@ -0,0 +1,132 @@ + +from twisted.internet import defer +from twisted.python.failure import Failure +from twisted.python import log +from allmydata.util.assertutil import precondition + +class PipelineError(Exception): + """One of the pipelined messages returned an error. The received Failure + object is stored in my .error attribute.""" + def __init__(self, error): + self.error = error + + def __repr__(self): + return "" % self.error + +class SingleFileError(Exception): + """You are not permitted to add a job to a full pipeline.""" + + +class ExpandableDeferredList(defer.Deferred): + # like DeferredList(fireOnOneErrback=True) with a built-in + # gatherResults(), but you can add new Deferreds until you close it. This + # gives you a chance to add don't-complain-about-unhandled-error errbacks + # immediately after attachment, regardless of whether you actually end up + # wanting the list or not. + def __init__(self): + defer.Deferred.__init__(self) + self.resultsReceived = 0 + self.resultList = [] + self.failure = None + self.closed = False + + def addDeferred(self, d): + precondition(not self.closed, "don't call addDeferred() on a closed ExpandableDeferredList") + index = len(self.resultList) + self.resultList.append(None) + d.addCallbacks(self._cbDeferred, self._ebDeferred, + callbackArgs=(index,)) + return d + + def close(self): + self.closed = True + self.checkForFinished() + + def checkForFinished(self): + if not self.closed: + return + if self.called: + return + if self.failure: + self.errback(self.failure) + elif self.resultsReceived == len(self.resultList): + self.callback(self.resultList) + + def _cbDeferred(self, res, index): + self.resultList[index] = res + self.resultsReceived += 1 + self.checkForFinished() + return res + + def _ebDeferred(self, f): + self.failure = f + self.checkForFinished() + return f + + +class Pipeline: + """I manage a size-limited pipeline of Deferred operations, usually + callRemote() messages.""" + + def __init__(self, capacity): + self.capacity = capacity # how full we can be + self.gauge = 0 # how full we are + self.failure = None + self.waiting = [] # callers of add() who are blocked + self.unflushed = ExpandableDeferredList() + + def add(self, _size, _func, *args, **kwargs): + # We promise that all the Deferreds we return will fire in the order + # they were returned. To make it easier to keep this promise, we + # prohibit multiple outstanding calls to add() . + if self.waiting: + raise SingleFileError + if self.failure: + return defer.fail(self.failure) + self.gauge += _size + fd = defer.maybeDeferred(_func, *args, **kwargs) + fd.addBoth(self._call_finished, _size) + self.unflushed.addDeferred(fd) + fd.addErrback(self._eat_pipeline_errors) + fd.addErrback(log.err, "_eat_pipeline_errors didn't eat it") + if self.gauge < self.capacity: + return defer.succeed(None) + d = defer.Deferred() + self.waiting.append(d) + return d + + def flush(self): + if self.failure: + return defer.fail(self.failure) + d, self.unflushed = self.unflushed, ExpandableDeferredList() + d.close() + d.addErrback(self._flushed_error) + return d + + def _flushed_error(self, f): + precondition(self.failure) # should have been set by _call_finished + return self.failure + + def _call_finished(self, res, size): + self.gauge -= size + if isinstance(res, Failure): + res = Failure(PipelineError(res)) + if not self.failure: + self.failure = res + if self.failure: + while self.waiting: + d = self.waiting.pop(0) + d.errback(self.failure) + else: + while self.waiting and (self.gauge < self.capacity): + d = self.waiting.pop(0) + d.callback(None) + # the d.callback() might trigger a new call to add(), which + # will raise our gauge and might cause the pipeline to be + # filled. So the while() loop gets a chance to tell the + # caller to stop. + return res + + def _eat_pipeline_errors(self, f): + f.trap(PipelineError) + return None -- 2.45.2