]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/util/deferredutil.py
Daira's fix to the hook mixin
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / util / deferredutil.py
1
2 import time
3
4 from foolscap.api import eventually, fireEventually
5 from twisted.internet import defer, reactor
6
7 from allmydata.util import log
8 from allmydata.util.assertutil import _assert
9 from allmydata.util.pollmixin import PollMixin
10
11
12 # utility wrapper for DeferredList
13 def _check_deferred_list(results):
14     # if any of the component Deferreds failed, return the first failure such
15     # that an addErrback() would fire. If all were ok, return a list of the
16     # results (without the success/failure booleans)
17     for success,f in results:
18         if not success:
19             return f
20     return [r[1] for r in results]
21
22 def DeferredListShouldSucceed(dl):
23     d = defer.DeferredList(dl)
24     d.addCallback(_check_deferred_list)
25     return d
26
27 def _parseDListResult(l):
28     return [x[1] for x in l]
29
30 def _unwrapFirstError(f):
31     f.trap(defer.FirstError)
32     raise f.value.subFailure
33
34 def gatherResults(deferredList):
35     """Returns list with result of given Deferreds.
36
37     This builds on C{DeferredList} but is useful since you don't
38     need to parse the result for success/failure.
39
40     @type deferredList:  C{list} of L{Deferred}s
41     """
42     d = defer.DeferredList(deferredList, fireOnOneErrback=True, consumeErrors=True)
43     d.addCallbacks(_parseDListResult, _unwrapFirstError)
44     return d
45
46
47 def _with_log(op, res):
48     """
49     The default behaviour on firing an already-fired Deferred is unhelpful for
50     debugging, because the AlreadyCalledError can easily get lost or be raised
51     in a context that results in a different error. So make sure it is logged
52     (for the abstractions defined here). If we are in a test, log.err will cause
53     the test to fail.
54     """
55     try:
56         op(res)
57     except defer.AlreadyCalledError, e:
58         log.err(e, op=repr(op), level=log.WEIRD)
59
60 def eventually_callback(d):
61     def _callback(res):
62         eventually(_with_log, d.callback, res)
63         return res
64     return _callback
65
66 def eventually_errback(d):
67     def _errback(res):
68         eventually(_with_log, d.errback, res)
69         return res
70     return _errback
71
72 def eventual_chain(source, target):
73     source.addCallbacks(eventually_callback(target), eventually_errback(target))
74
75
76 class HookMixin:
77     """
78     I am a helper mixin that maintains a collection of named hooks, primarily
79     for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
80     and can then be fired exactly once at the appropriate time by '_call_hook'.
81     If 'ignore_count' is given, that number of calls to '_call_hook' will be
82     ignored before firing the hook.
83
84     I assume a '_hooks' attribute that should set by the class constructor to
85     a dict mapping each valid hook name to None.
86     """
87     def set_hook(self, name, d=None, ignore_count=0):
88         """
89         Called by the hook observer (e.g. by a test).
90         If d is not given, an unfired Deferred is created and returned.
91         The hook must not already be set.
92         """
93         self._log("set_hook %r, ignore_count=%r" % (name, ignore_count))
94         if d is None:
95             d = defer.Deferred()
96         _assert(ignore_count >= 0, ignore_count=ignore_count)
97         _assert(name in self._hooks, name=name)
98         _assert(self._hooks[name] is None, name=name, hook=self._hooks[name])
99         _assert(isinstance(d, defer.Deferred), d=d)
100
101         self._hooks[name] = (d, ignore_count)
102         return d
103
104     def _call_hook(self, res, name, async=False):
105         """
106         Called to trigger the hook, with argument 'res'. This is a no-op if
107         the hook is unset. If the hook's ignore_count is positive, it will be
108         decremented; if it was already zero, the hook will be unset, and then
109         its Deferred will be fired synchronously.
110
111         The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
112         This ensures that if 'res' is a failure, the hook will be errbacked,
113         which will typically cause the test to also fail.
114         'res' is returned so that the current result or failure will be passed
115         through.
116         """
117         hook = self._hooks[name]
118         if hook is None:
119             return None
120
121         (d, ignore_count) = hook
122         self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
123         if ignore_count > 0:
124             self._hooks[name] = (d, ignore_count - 1)
125         else:
126             self._hooks[name] = None
127             if async:
128                 _with_log(eventually_callback(d), res)
129             else:
130                 _with_log(d.callback, res)
131         return res
132
133     def _log(self, msg):
134         log.msg(msg, level=log.NOISY)
135
136
137 def async_iterate(process, iterable, *extra_args, **kwargs):
138     """
139     I iterate over the elements of 'iterable' (which may be deferred), eventually
140     applying 'process' to each one, optionally with 'extra_args' and 'kwargs'.
141     'process' should return a (possibly deferred) boolean: True to continue the
142     iteration, False to stop.
143
144     I return a Deferred that fires with True if all elements of the iterable
145     were processed (i.e. 'process' only returned True values); with False if
146     the iteration was stopped by 'process' returning False; or that fails with
147     the first failure of either 'process' or the iterator.
148     """
149     iterator = iter(iterable)
150
151     d = defer.succeed(None)
152     def _iterate(ign):
153         d2 = defer.maybeDeferred(iterator.next)
154         def _cb(item):
155             d3 = defer.maybeDeferred(process, item, *extra_args, **kwargs)
156             def _maybe_iterate(res):
157                 if res:
158                     d4 = fireEventually()
159                     d4.addCallback(_iterate)
160                     return d4
161                 return False
162             d3.addCallback(_maybe_iterate)
163             return d3
164         def _eb(f):
165             f.trap(StopIteration)
166             return True
167         d2.addCallbacks(_cb, _eb)
168         return d2
169     d.addCallback(_iterate)
170     return d
171
172
173 def for_items(cb, mapping):
174     """
175     For each (key, value) pair in a mapping, I add a callback to cb(None, key, value)
176     to a Deferred that fires immediately. I return that Deferred.
177     """
178     d = defer.succeed(None)
179     for k, v in mapping.items():
180         d.addCallback(lambda ign, k=k, v=v: cb(None, k, v))
181     return d
182
183
184 class WaitForDelayedCallsMixin(PollMixin):
185     def _delayed_calls_done(self):
186         # We're done when the only remaining DelayedCalls fire after threshold.
187         # (These will be associated with the test timeout, or else they *should*
188         # cause an unclean reactor error because the test should have waited for
189         # them.)
190         threshold = time.time() + 10
191         for delayed in reactor.getDelayedCalls():
192             if delayed.getTime() < threshold:
193                 return False
194         return True
195
196     def wait_for_delayed_calls(self, res=None):
197         """
198         Use like this at the end of a test:
199           d.addBoth(self.wait_for_delayed_calls)
200         """
201         d = self.poll(self._delayed_calls_done)
202         d.addErrback(log.err, "error while waiting for delayed calls")
203         d.addBoth(lambda ign: res)
204         return d