4 from foolscap.api import eventually, fireEventually
5 from twisted.internet import defer, reactor
7 from allmydata.util import log
8 from allmydata.util.pollmixin import PollMixin
11 # utility wrapper for DeferredList
12 def _check_deferred_list(results):
13 # if any of the component Deferreds failed, return the first failure such
14 # that an addErrback() would fire. If all were ok, return a list of the
15 # results (without the success/failure booleans)
16 for success,f in results:
19 return [r[1] for r in results]
21 def DeferredListShouldSucceed(dl):
22 d = defer.DeferredList(dl)
23 d.addCallback(_check_deferred_list)
26 def _parseDListResult(l):
27 return [x[1] for x in l]
29 def _unwrapFirstError(f):
30 f.trap(defer.FirstError)
31 raise f.value.subFailure
33 def gatherResults(deferredList):
34 """Returns list with result of given Deferreds.
36 This builds on C{DeferredList} but is useful since you don't
37 need to parse the result for success/failure.
39 @type deferredList: C{list} of L{Deferred}s
41 d = defer.DeferredList(deferredList, fireOnOneErrback=True, consumeErrors=True)
42 d.addCallbacks(_parseDListResult, _unwrapFirstError)
46 def _with_log(op, res):
48 The default behaviour on firing an already-fired Deferred is unhelpful for
49 debugging, because the AlreadyCalledError can easily get lost or be raised
50 in a context that results in a different error. So make sure it is logged
51 (for the abstractions defined here). If we are in a test, log.err will cause
56 except defer.AlreadyCalledError, e:
57 log.err(e, op=repr(op), level=log.WEIRD)
59 def eventually_callback(d):
61 eventually(_with_log, d.callback, res)
65 def eventually_errback(d):
67 eventually(_with_log, d.errback, res)
71 def eventual_chain(source, target):
72 source.addCallbacks(eventually_callback(target), eventually_errback(target))
77 I am a helper mixin that maintains a collection of named hooks, primarily
78 for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
79 and can then be fired exactly once at the appropriate time by '_call_hook'.
81 I assume a '_hooks' attribute that should set by the class constructor to
82 a dict mapping each valid hook name to None.
84 def set_hook(self, name, d=None):
86 Called by the hook observer (e.g. by a test).
87 If d is not given, an unfired Deferred is created and returned.
88 The hook must not already be set.
92 assert self._hooks[name] is None, self._hooks[name]
93 assert isinstance(d, defer.Deferred), d
97 def _call_hook(self, res, name):
99 Called to trigger the hook, with argument 'res'. This is a no-op if the
100 hook is unset. Otherwise, the hook will be unset, and then its Deferred
101 will be fired synchronously.
103 The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
104 This ensures that if 'res' is a failure, the hook will be errbacked,
105 which will typically cause the test to also fail.
106 'res' is returned so that the current result or failure will be passed
109 d = self._hooks[name]
111 return defer.succeed(None)
112 self._hooks[name] = None
113 _with_log(d.callback, res)
117 def async_iterate(process, iterable, *extra_args, **kwargs):
119 I iterate over the elements of 'iterable' (which may be deferred), eventually
120 applying 'process' to each one, optionally with 'extra_args' and 'kwargs'.
121 'process' should return a (possibly deferred) boolean: True to continue the
122 iteration, False to stop.
124 I return a Deferred that fires with True if all elements of the iterable
125 were processed (i.e. 'process' only returned True values); with False if
126 the iteration was stopped by 'process' returning False; or that fails with
127 the first failure of either 'process' or the iterator.
129 iterator = iter(iterable)
131 d = defer.succeed(None)
133 d2 = defer.maybeDeferred(iterator.next)
135 d3 = defer.maybeDeferred(process, item, *extra_args, **kwargs)
136 def _maybe_iterate(res):
138 d4 = fireEventually()
139 d4.addCallback(_iterate)
142 d3.addCallback(_maybe_iterate)
145 f.trap(StopIteration)
147 d2.addCallbacks(_cb, _eb)
149 d.addCallback(_iterate)
153 def for_items(cb, mapping):
155 For each (key, value) pair in a mapping, I add a callback to cb(None, key, value)
156 to a Deferred that fires immediately. I return that Deferred.
158 d = defer.succeed(None)
159 for k, v in mapping.items():
160 d.addCallback(lambda ign, k=k, v=v: cb(None, k, v))
164 class WaitForDelayedCallsMixin(PollMixin):
165 def _delayed_calls_done(self):
166 # We're done when the only remaining DelayedCalls fire after threshold.
167 # (These will be associated with the test timeout, or else they *should*
168 # cause an unclean reactor error because the test should have waited for
170 threshold = time.time() + 10
171 for delayed in reactor.getDelayedCalls():
172 if delayed.getTime() < threshold:
176 def wait_for_delayed_calls(self, res=None):
178 Use like this at the end of a test:
179 d.addBoth(self.wait_for_delayed_calls)
181 d = self.poll(self._delayed_calls_done)
182 d.addErrback(log.err, "error while waiting for delayed calls")
183 d.addBoth(lambda ign: res)