]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/util/deferredutil.py
Call hooks eventually.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / util / deferredutil.py
1
2 import time
3
4 from foolscap.api import eventually, fireEventually
5 from twisted.internet import defer, reactor
6
7 from allmydata.util import log
8 from allmydata.util.assertutil import _assert
9 from allmydata.util.pollmixin import PollMixin
10
11
12 # utility wrapper for DeferredList
13 def _check_deferred_list(results):
14     # if any of the component Deferreds failed, return the first failure such
15     # that an addErrback() would fire. If all were ok, return a list of the
16     # results (without the success/failure booleans)
17     for success,f in results:
18         if not success:
19             return f
20     return [r[1] for r in results]
21
22 def DeferredListShouldSucceed(dl):
23     d = defer.DeferredList(dl)
24     d.addCallback(_check_deferred_list)
25     return d
26
27 def _parseDListResult(l):
28     return [x[1] for x in l]
29
30 def _unwrapFirstError(f):
31     f.trap(defer.FirstError)
32     raise f.value.subFailure
33
34 def gatherResults(deferredList):
35     """Returns list with result of given Deferreds.
36
37     This builds on C{DeferredList} but is useful since you don't
38     need to parse the result for success/failure.
39
40     @type deferredList:  C{list} of L{Deferred}s
41     """
42     d = defer.DeferredList(deferredList, fireOnOneErrback=True, consumeErrors=True)
43     d.addCallbacks(_parseDListResult, _unwrapFirstError)
44     return d
45
46
47 def _with_log(op, res):
48     """
49     The default behaviour on firing an already-fired Deferred is unhelpful for
50     debugging, because the AlreadyCalledError can easily get lost or be raised
51     in a context that results in a different error. So make sure it is logged
52     (for the abstractions defined here). If we are in a test, log.err will cause
53     the test to fail.
54     """
55     try:
56         op(res)
57     except defer.AlreadyCalledError, e:
58         log.err(e, op=repr(op), level=log.WEIRD)
59
60 def eventually_callback(d):
61     def _callback(res):
62         eventually(_with_log, d.callback, res)
63         return res
64     return _callback
65
66 def eventually_errback(d):
67     def _errback(res):
68         eventually(_with_log, d.errback, res)
69         return res
70     return _errback
71
72 def eventual_chain(source, target):
73     source.addCallbacks(eventually_callback(target), eventually_errback(target))
74
75
76 class HookMixin:
77     """
78     I am a helper mixin that maintains a collection of named hooks, primarily
79     for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
80     and can then be fired exactly once at the appropriate time by '_call_hook'.
81     If 'ignore_count' is given, that number of calls to '_call_hook' will be
82     ignored before firing the hook.
83
84     I assume a '_hooks' attribute that should set by the class constructor to
85     a dict mapping each valid hook name to None.
86     """
87     def set_hook(self, name, d=None, ignore_count=0):
88         """
89         Called by the hook observer (e.g. by a test).
90         If d is not given, an unfired Deferred is created and returned.
91         The hook must not already be set.
92         """
93         self._log("set_hook %r, ignore_count=%r" % (name, ignore_count))
94         if d is None:
95             d = defer.Deferred()
96         _assert(ignore_count >= 0, ignore_count=ignore_count)
97         _assert(name in self._hooks, name=name)
98         _assert(self._hooks[name] is None, name=name, hook=self._hooks[name])
99         _assert(isinstance(d, defer.Deferred), d=d)
100
101         self._hooks[name] = (d, ignore_count)
102         return d
103
104     def _call_hook(self, res, name):
105         """
106         Called to trigger the hook, with argument 'res'. This is a no-op if
107         the hook is unset. If the hook's ignore_count is positive, it will be
108         decremented; if it was already zero, the hook will be unset, and then
109         its Deferred will be fired synchronously.
110
111         The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
112         This ensures that if 'res' is a failure, the hook will be errbacked,
113         which will typically cause the test to also fail.
114         'res' is returned so that the current result or failure will be passed
115         through.
116         """
117         hook = self._hooks[name]
118         if hook is None:
119             return None
120
121         (d, ignore_count) = hook
122         self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
123         if ignore_count > 0:
124             self._hooks[name] = (d, ignore_count - 1)
125         else:
126             self._hooks[name] = None
127             _with_log(eventually_callback(d), res)
128         return res
129
130     def _log(self, msg):
131         log.msg(msg, level=log.NOISY)
132
133
134 def async_iterate(process, iterable, *extra_args, **kwargs):
135     """
136     I iterate over the elements of 'iterable' (which may be deferred), eventually
137     applying 'process' to each one, optionally with 'extra_args' and 'kwargs'.
138     'process' should return a (possibly deferred) boolean: True to continue the
139     iteration, False to stop.
140
141     I return a Deferred that fires with True if all elements of the iterable
142     were processed (i.e. 'process' only returned True values); with False if
143     the iteration was stopped by 'process' returning False; or that fails with
144     the first failure of either 'process' or the iterator.
145     """
146     iterator = iter(iterable)
147
148     d = defer.succeed(None)
149     def _iterate(ign):
150         d2 = defer.maybeDeferred(iterator.next)
151         def _cb(item):
152             d3 = defer.maybeDeferred(process, item, *extra_args, **kwargs)
153             def _maybe_iterate(res):
154                 if res:
155                     d4 = fireEventually()
156                     d4.addCallback(_iterate)
157                     return d4
158                 return False
159             d3.addCallback(_maybe_iterate)
160             return d3
161         def _eb(f):
162             f.trap(StopIteration)
163             return True
164         d2.addCallbacks(_cb, _eb)
165         return d2
166     d.addCallback(_iterate)
167     return d
168
169
170 def for_items(cb, mapping):
171     """
172     For each (key, value) pair in a mapping, I add a callback to cb(None, key, value)
173     to a Deferred that fires immediately. I return that Deferred.
174     """
175     d = defer.succeed(None)
176     for k, v in mapping.items():
177         d.addCallback(lambda ign, k=k, v=v: cb(None, k, v))
178     return d
179
180
181 class WaitForDelayedCallsMixin(PollMixin):
182     def _delayed_calls_done(self):
183         # We're done when the only remaining DelayedCalls fire after threshold.
184         # (These will be associated with the test timeout, or else they *should*
185         # cause an unclean reactor error because the test should have waited for
186         # them.)
187         threshold = time.time() + 10
188         for delayed in reactor.getDelayedCalls():
189             if delayed.getTime() < threshold:
190                 return False
191         return True
192
193     def wait_for_delayed_calls(self, res=None):
194         """
195         Use like this at the end of a test:
196           d.addBoth(self.wait_for_delayed_calls)
197         """
198         d = self.poll(self._delayed_calls_done)
199         d.addErrback(log.err, "error while waiting for delayed calls")
200         d.addBoth(lambda ign: res)
201         return d