4 from foolscap.api import eventually, fireEventually
5 from twisted.internet import defer, reactor
7 from allmydata.util import log
8 from allmydata.util.assertutil import _assert
9 from allmydata.util.pollmixin import PollMixin
12 # utility wrapper for DeferredList
13 def _check_deferred_list(results):
14 # if any of the component Deferreds failed, return the first failure such
15 # that an addErrback() would fire. If all were ok, return a list of the
16 # results (without the success/failure booleans)
17 for success,f in results:
20 return [r[1] for r in results]
22 def DeferredListShouldSucceed(dl):
23 d = defer.DeferredList(dl)
24 d.addCallback(_check_deferred_list)
27 def _parseDListResult(l):
28 return [x[1] for x in l]
30 def _unwrapFirstError(f):
31 f.trap(defer.FirstError)
32 raise f.value.subFailure
34 def gatherResults(deferredList):
35 """Returns list with result of given Deferreds.
37 This builds on C{DeferredList} but is useful since you don't
38 need to parse the result for success/failure.
40 @type deferredList: C{list} of L{Deferred}s
42 d = defer.DeferredList(deferredList, fireOnOneErrback=True, consumeErrors=True)
43 d.addCallbacks(_parseDListResult, _unwrapFirstError)
47 def _with_log(op, res):
49 The default behaviour on firing an already-fired Deferred is unhelpful for
50 debugging, because the AlreadyCalledError can easily get lost or be raised
51 in a context that results in a different error. So make sure it is logged
52 (for the abstractions defined here). If we are in a test, log.err will cause
57 except defer.AlreadyCalledError, e:
58 log.err(e, op=repr(op), level=log.WEIRD)
60 def eventually_callback(d):
62 eventually(_with_log, d.callback, res)
66 def eventually_errback(d):
68 eventually(_with_log, d.errback, res)
72 def eventual_chain(source, target):
73 source.addCallbacks(eventually_callback(target), eventually_errback(target))
78 I am a helper mixin that maintains a collection of named hooks, primarily
79 for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
80 and can then be fired exactly once at the appropriate time by '_call_hook'.
81 If 'ignore_count' is given, that number of calls to '_call_hook' will be
82 ignored before firing the hook.
84 I assume a '_hooks' attribute that should set by the class constructor to
85 a dict mapping each valid hook name to None.
87 def set_hook(self, name, d=None, ignore_count=0):
89 Called by the hook observer (e.g. by a test).
90 If d is not given, an unfired Deferred is created and returned.
91 The hook must not already be set.
93 self._log("set_hook %r, ignore_count=%r" % (name, ignore_count))
96 _assert(ignore_count >= 0, ignore_count=ignore_count)
97 _assert(name in self._hooks, name=name)
98 _assert(self._hooks[name] is None, name=name, hook=self._hooks[name])
99 _assert(isinstance(d, defer.Deferred), d=d)
101 self._hooks[name] = (d, ignore_count)
104 def _call_hook(self, res, name):
106 Called to trigger the hook, with argument 'res'. This is a no-op if
107 the hook is unset. If the hook's ignore_count is positive, it will be
108 decremented; if it was already zero, the hook will be unset, and then
109 its Deferred will be fired synchronously.
111 The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
112 This ensures that if 'res' is a failure, the hook will be errbacked,
113 which will typically cause the test to also fail.
114 'res' is returned so that the current result or failure will be passed
117 hook = self._hooks[name]
121 (d, ignore_count) = hook
122 self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
124 self._hooks[name] = (d, ignore_count - 1)
126 self._hooks[name] = None
127 _with_log(d.callback, res)
131 log.msg(msg, level=log.NOISY)
134 def async_iterate(process, iterable, *extra_args, **kwargs):
136 I iterate over the elements of 'iterable' (which may be deferred), eventually
137 applying 'process' to each one, optionally with 'extra_args' and 'kwargs'.
138 'process' should return a (possibly deferred) boolean: True to continue the
139 iteration, False to stop.
141 I return a Deferred that fires with True if all elements of the iterable
142 were processed (i.e. 'process' only returned True values); with False if
143 the iteration was stopped by 'process' returning False; or that fails with
144 the first failure of either 'process' or the iterator.
146 iterator = iter(iterable)
148 d = defer.succeed(None)
150 d2 = defer.maybeDeferred(iterator.next)
152 d3 = defer.maybeDeferred(process, item, *extra_args, **kwargs)
153 def _maybe_iterate(res):
155 d4 = fireEventually()
156 d4.addCallback(_iterate)
159 d3.addCallback(_maybe_iterate)
162 f.trap(StopIteration)
164 d2.addCallbacks(_cb, _eb)
166 d.addCallback(_iterate)
170 def for_items(cb, mapping):
172 For each (key, value) pair in a mapping, I add a callback to cb(None, key, value)
173 to a Deferred that fires immediately. I return that Deferred.
175 d = defer.succeed(None)
176 for k, v in mapping.items():
177 d.addCallback(lambda ign, k=k, v=v: cb(None, k, v))
181 class WaitForDelayedCallsMixin(PollMixin):
182 def _delayed_calls_done(self):
183 # We're done when the only remaining DelayedCalls fire after threshold.
184 # (These will be associated with the test timeout, or else they *should*
185 # cause an unclean reactor error because the test should have waited for
187 threshold = time.time() + 10
188 for delayed in reactor.getDelayedCalls():
189 if delayed.getTime() < threshold:
193 def wait_for_delayed_calls(self, res=None):
195 Use like this at the end of a test:
196 d.addBoth(self.wait_for_delayed_calls)
198 d = self.poll(self._delayed_calls_done)
199 d.addErrback(log.err, "error while waiting for delayed calls")
200 d.addBoth(lambda ign: res)