4 from foolscap.api import eventually, fireEventually
5 from twisted.internet import defer, reactor
7 from allmydata.util import log
8 from allmydata.util.assertutil import _assert
9 from allmydata.util.pollmixin import PollMixin
12 # utility wrapper for DeferredList
13 def _check_deferred_list(results):
14 # if any of the component Deferreds failed, return the first failure such
15 # that an addErrback() would fire. If all were ok, return a list of the
16 # results (without the success/failure booleans)
17 for success,f in results:
20 return [r[1] for r in results]
22 def DeferredListShouldSucceed(dl):
23 d = defer.DeferredList(dl)
24 d.addCallback(_check_deferred_list)
27 def _parseDListResult(l):
28 return [x[1] for x in l]
30 def _unwrapFirstError(f):
31 f.trap(defer.FirstError)
32 raise f.value.subFailure
34 def gatherResults(deferredList):
35 """Returns list with result of given Deferreds.
37 This builds on C{DeferredList} but is useful since you don't
38 need to parse the result for success/failure.
40 @type deferredList: C{list} of L{Deferred}s
42 d = defer.DeferredList(deferredList, fireOnOneErrback=True, consumeErrors=True)
43 d.addCallbacks(_parseDListResult, _unwrapFirstError)
47 def _with_log(op, res):
49 The default behaviour on firing an already-fired Deferred is unhelpful for
50 debugging, because the AlreadyCalledError can easily get lost or be raised
51 in a context that results in a different error. So make sure it is logged
52 (for the abstractions defined here). If we are in a test, log.err will cause
57 except defer.AlreadyCalledError, e:
58 log.err(e, op=repr(op), level=log.WEIRD)
60 def eventually_callback(d):
62 eventually(_with_log, d.callback, res)
66 def eventually_errback(d):
68 eventually(_with_log, d.errback, res)
72 def eventual_chain(source, target):
73 source.addCallbacks(eventually_callback(target), eventually_errback(target))
78 I am a helper mixin that maintains a collection of named hooks, primarily
79 for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
80 and can then be fired exactly once at the appropriate time by '_call_hook'.
81 If 'ignore_count' is given, that number of calls to '_call_hook' will be
82 ignored before firing the hook.
84 I assume a '_hooks' attribute that should set by the class constructor to
85 a dict mapping each valid hook name to None.
87 def set_hook(self, name, d=None, ignore_count=0):
89 Called by the hook observer (e.g. by a test).
90 If d is not given, an unfired Deferred is created and returned.
91 The hook must not already be set.
93 self._log("set_hook %r, ignore_count=%r" % (name, ignore_count))
96 _assert(ignore_count >= 0, ignore_count=ignore_count)
97 _assert(name in self._hooks, name=name)
98 _assert(self._hooks[name] is None, name=name, hook=self._hooks[name])
99 _assert(isinstance(d, defer.Deferred), d=d)
101 self._hooks[name] = (d, ignore_count)
104 def _call_hook(self, res, name, async=False):
106 Called to trigger the hook, with argument 'res'. This is a no-op if
107 the hook is unset. If the hook's ignore_count is positive, it will be
108 decremented; if it was already zero, the hook will be unset, and then
109 its Deferred will be fired synchronously.
111 The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
112 This ensures that if 'res' is a failure, the hook will be errbacked,
113 which will typically cause the test to also fail.
114 'res' is returned so that the current result or failure will be passed
117 hook = self._hooks[name]
121 (d, ignore_count) = hook
122 self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
124 self._hooks[name] = (d, ignore_count - 1)
126 self._hooks[name] = None
128 _with_log(eventually_callback(d), res)
130 _with_log(d.callback, res)
134 log.msg(msg, level=log.NOISY)
137 def async_iterate(process, iterable, *extra_args, **kwargs):
139 I iterate over the elements of 'iterable' (which may be deferred), eventually
140 applying 'process' to each one, optionally with 'extra_args' and 'kwargs'.
141 'process' should return a (possibly deferred) boolean: True to continue the
142 iteration, False to stop.
144 I return a Deferred that fires with True if all elements of the iterable
145 were processed (i.e. 'process' only returned True values); with False if
146 the iteration was stopped by 'process' returning False; or that fails with
147 the first failure of either 'process' or the iterator.
149 iterator = iter(iterable)
151 d = defer.succeed(None)
153 d2 = defer.maybeDeferred(iterator.next)
155 d3 = defer.maybeDeferred(process, item, *extra_args, **kwargs)
156 def _maybe_iterate(res):
158 d4 = fireEventually()
159 d4.addCallback(_iterate)
162 d3.addCallback(_maybe_iterate)
165 f.trap(StopIteration)
167 d2.addCallbacks(_cb, _eb)
169 d.addCallback(_iterate)
173 def for_items(cb, mapping):
175 For each (key, value) pair in a mapping, I add a callback to cb(None, key, value)
176 to a Deferred that fires immediately. I return that Deferred.
178 d = defer.succeed(None)
179 for k, v in mapping.items():
180 d.addCallback(lambda ign, k=k, v=v: cb(None, k, v))
184 class WaitForDelayedCallsMixin(PollMixin):
185 def _delayed_calls_done(self):
186 # We're done when the only remaining DelayedCalls fire after threshold.
187 # (These will be associated with the test timeout, or else they *should*
188 # cause an unclean reactor error because the test should have waited for
190 threshold = time.time() + 10
191 for delayed in reactor.getDelayedCalls():
192 if delayed.getTime() < threshold:
196 def wait_for_delayed_calls(self, res=None):
198 Use like this at the end of a test:
199 d.addBoth(self.wait_for_delayed_calls)
201 d = self.poll(self._delayed_calls_done)
202 d.addErrback(log.err, "error while waiting for delayed calls")
203 d.addBoth(lambda ign: res)