From: David-Sarah Hopwood Date: Fri, 23 Nov 2012 00:23:54 +0000 (+0000) Subject: Some useful Deferred utilities, originally from the cloud backend branch. X-Git-Url: https://git.rkrishnan.org/simplejson/components/com_hotproperty/rgr-080307.php?a=commitdiff_plain;h=3f34e106c49c71daec8f2c91a82ddaf58527be28;p=tahoe-lafs%2Ftahoe-lafs.git Some useful Deferred utilities, originally from the cloud backend branch. Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index d907c118..4ba13a13 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -581,7 +581,7 @@ class PollMixinTests(unittest.TestCase): d.addCallbacks(_suc, _err) return d -class DeferredUtilTests(unittest.TestCase): +class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin): def test_gather_results(self): d1 = defer.Deferred() d2 = defer.Deferred() @@ -621,6 +621,21 @@ class DeferredUtilTests(unittest.TestCase): self.failUnless(isinstance(f, Failure)) self.failUnless(f.check(ValueError)) + def test_wait_for_delayed_calls(self): + """ + This tests that 'wait_for_delayed_calls' does in fact wait for a + delayed call that is active when the test returns. If it didn't, + Trial would report an unclean reactor error for this test. + """ + def _trigger(): + #print "trigger" + pass + reactor.callLater(0.1, _trigger) + + d = defer.succeed(None) + d.addBoth(self.wait_for_delayed_calls) + return d + class HashUtilTests(unittest.TestCase): def test_random_key(self): diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index a1767417..989e85e8 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -1,4 +1,12 @@ -from twisted.internet import defer + +import time + +from foolscap.api import eventually, fireEventually +from twisted.internet import defer, reactor + +from allmydata.util import log +from allmydata.util.pollmixin import PollMixin + # utility wrapper for DeferredList def _check_deferred_list(results): @@ -9,6 +17,7 @@ def _check_deferred_list(results): if not success: return f return [r[1] for r in results] + def DeferredListShouldSucceed(dl): d = defer.DeferredList(dl) d.addCallback(_check_deferred_list) @@ -33,3 +42,143 @@ def gatherResults(deferredList): d.addCallbacks(_parseDListResult, _unwrapFirstError) return d + +def _with_log(op, res): + """ + The default behaviour on firing an already-fired Deferred is unhelpful for + debugging, because the AlreadyCalledError can easily get lost or be raised + in a context that results in a different error. So make sure it is logged + (for the abstractions defined here). If we are in a test, log.err will cause + the test to fail. + """ + try: + op(res) + except defer.AlreadyCalledError, e: + log.err(e, op=repr(op), level=log.WEIRD) + +def eventually_callback(d): + def _callback(res): + eventually(_with_log, d.callback, res) + return res + return _callback + +def eventually_errback(d): + def _errback(res): + eventually(_with_log, d.errback, res) + return res + return _errback + +def eventual_chain(source, target): + source.addCallbacks(eventually_callback(target), eventually_errback(target)) + + +class HookMixin: + """ + I am a helper mixin that maintains a collection of named hooks, primarily + for use in tests. Each hook is set to an unfired Deferred using 'set_hook', + and can then be fired exactly once at the appropriate time by '_call_hook'. + + I assume a '_hooks' attribute that should set by the class constructor to + a dict mapping each valid hook name to None. + """ + def set_hook(self, name, d=None): + """ + Called by the hook observer (e.g. by a test). + If d is not given, an unfired Deferred is created and returned. + The hook must not already be set. + """ + if d is None: + d = defer.Deferred() + assert self._hooks[name] is None, self._hooks[name] + assert isinstance(d, defer.Deferred), d + self._hooks[name] = d + return d + + def _call_hook(self, res, name): + """ + Called to trigger the hook, with argument 'res'. This is a no-op if the + hook is unset. Otherwise, the hook will be unset, and then its Deferred + will be fired synchronously. + + The expected usage is "deferred.addBoth(self._call_hook, 'hookname')". + This ensures that if 'res' is a failure, the hook will be errbacked, + which will typically cause the test to also fail. + 'res' is returned so that the current result or failure will be passed + through. + """ + d = self._hooks[name] + if d is None: + return defer.succeed(None) + self._hooks[name] = None + _with_log(d.callback, res) + return res + + +def async_iterate(process, iterable, *extra_args, **kwargs): + """ + I iterate over the elements of 'iterable' (which may be deferred), eventually + applying 'process' to each one, optionally with 'extra_args' and 'kwargs'. + 'process' should return a (possibly deferred) boolean: True to continue the + iteration, False to stop. + + I return a Deferred that fires with True if all elements of the iterable + were processed (i.e. 'process' only returned True values); with False if + the iteration was stopped by 'process' returning False; or that fails with + the first failure of either 'process' or the iterator. + """ + iterator = iter(iterable) + + d = defer.succeed(None) + def _iterate(ign): + d2 = defer.maybeDeferred(iterator.next) + def _cb(item): + d3 = defer.maybeDeferred(process, item, *extra_args, **kwargs) + def _maybe_iterate(res): + if res: + d4 = fireEventually() + d4.addCallback(_iterate) + return d4 + return False + d3.addCallback(_maybe_iterate) + return d3 + def _eb(f): + f.trap(StopIteration) + return True + d2.addCallbacks(_cb, _eb) + return d2 + d.addCallback(_iterate) + return d + + +def for_items(cb, mapping): + """ + For each (key, value) pair in a mapping, I add a callback to cb(None, key, value) + to a Deferred that fires immediately. I return that Deferred. + """ + d = defer.succeed(None) + for k, v in mapping.items(): + d.addCallback(lambda ign, k=k, v=v: cb(None, k, v)) + return d + + +class WaitForDelayedCallsMixin(PollMixin): + def _delayed_calls_done(self): + # We're done when the only remaining DelayedCalls fire after threshold. + # (These will be associated with the test timeout, or else they *should* + # cause an unclean reactor error because the test should have waited for + # them.) + threshold = time.time() + 10 + for delayed in reactor.getDelayedCalls(): + if delayed.getTime() < threshold: + return False + return True + + def wait_for_delayed_calls(self, res=None): + """ + Use like this at the end of a test: + d.addBoth(self.wait_for_delayed_calls) + """ + d = self.poll(self._delayed_calls_done) + d.addErrback(log.err, "error while waiting for delayed calls") + d.addBoth(lambda ign: res) + return d