]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/util/deferredutil.py
989e85e82408e38237592c3c04e4da642af4d1d8
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / util / deferredutil.py
1
2 import time
3
4 from foolscap.api import eventually, fireEventually
5 from twisted.internet import defer, reactor
6
7 from allmydata.util import log
8 from allmydata.util.pollmixin import PollMixin
9
10
11 # utility wrapper for DeferredList
12 def _check_deferred_list(results):
13     # if any of the component Deferreds failed, return the first failure such
14     # that an addErrback() would fire. If all were ok, return a list of the
15     # results (without the success/failure booleans)
16     for success,f in results:
17         if not success:
18             return f
19     return [r[1] for r in results]
20
21 def DeferredListShouldSucceed(dl):
22     d = defer.DeferredList(dl)
23     d.addCallback(_check_deferred_list)
24     return d
25
26 def _parseDListResult(l):
27     return [x[1] for x in l]
28
29 def _unwrapFirstError(f):
30     f.trap(defer.FirstError)
31     raise f.value.subFailure
32
33 def gatherResults(deferredList):
34     """Returns list with result of given Deferreds.
35
36     This builds on C{DeferredList} but is useful since you don't
37     need to parse the result for success/failure.
38
39     @type deferredList:  C{list} of L{Deferred}s
40     """
41     d = defer.DeferredList(deferredList, fireOnOneErrback=True, consumeErrors=True)
42     d.addCallbacks(_parseDListResult, _unwrapFirstError)
43     return d
44
45
46 def _with_log(op, res):
47     """
48     The default behaviour on firing an already-fired Deferred is unhelpful for
49     debugging, because the AlreadyCalledError can easily get lost or be raised
50     in a context that results in a different error. So make sure it is logged
51     (for the abstractions defined here). If we are in a test, log.err will cause
52     the test to fail.
53     """
54     try:
55         op(res)
56     except defer.AlreadyCalledError, e:
57         log.err(e, op=repr(op), level=log.WEIRD)
58
59 def eventually_callback(d):
60     def _callback(res):
61         eventually(_with_log, d.callback, res)
62         return res
63     return _callback
64
65 def eventually_errback(d):
66     def _errback(res):
67         eventually(_with_log, d.errback, res)
68         return res
69     return _errback
70
71 def eventual_chain(source, target):
72     source.addCallbacks(eventually_callback(target), eventually_errback(target))
73
74
75 class HookMixin:
76     """
77     I am a helper mixin that maintains a collection of named hooks, primarily
78     for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
79     and can then be fired exactly once at the appropriate time by '_call_hook'.
80
81     I assume a '_hooks' attribute that should set by the class constructor to
82     a dict mapping each valid hook name to None.
83     """
84     def set_hook(self, name, d=None):
85         """
86         Called by the hook observer (e.g. by a test).
87         If d is not given, an unfired Deferred is created and returned.
88         The hook must not already be set.
89         """
90         if d is None:
91             d = defer.Deferred()
92         assert self._hooks[name] is None, self._hooks[name]
93         assert isinstance(d, defer.Deferred), d
94         self._hooks[name] = d
95         return d
96
97     def _call_hook(self, res, name):
98         """
99         Called to trigger the hook, with argument 'res'. This is a no-op if the
100         hook is unset. Otherwise, the hook will be unset, and then its Deferred
101         will be fired synchronously.
102
103         The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
104         This ensures that if 'res' is a failure, the hook will be errbacked,
105         which will typically cause the test to also fail.
106         'res' is returned so that the current result or failure will be passed
107         through.
108         """
109         d = self._hooks[name]
110         if d is None:
111             return defer.succeed(None)
112         self._hooks[name] = None
113         _with_log(d.callback, res)
114         return res
115
116
117 def async_iterate(process, iterable, *extra_args, **kwargs):
118     """
119     I iterate over the elements of 'iterable' (which may be deferred), eventually
120     applying 'process' to each one, optionally with 'extra_args' and 'kwargs'.
121     'process' should return a (possibly deferred) boolean: True to continue the
122     iteration, False to stop.
123
124     I return a Deferred that fires with True if all elements of the iterable
125     were processed (i.e. 'process' only returned True values); with False if
126     the iteration was stopped by 'process' returning False; or that fails with
127     the first failure of either 'process' or the iterator.
128     """
129     iterator = iter(iterable)
130
131     d = defer.succeed(None)
132     def _iterate(ign):
133         d2 = defer.maybeDeferred(iterator.next)
134         def _cb(item):
135             d3 = defer.maybeDeferred(process, item, *extra_args, **kwargs)
136             def _maybe_iterate(res):
137                 if res:
138                     d4 = fireEventually()
139                     d4.addCallback(_iterate)
140                     return d4
141                 return False
142             d3.addCallback(_maybe_iterate)
143             return d3
144         def _eb(f):
145             f.trap(StopIteration)
146             return True
147         d2.addCallbacks(_cb, _eb)
148         return d2
149     d.addCallback(_iterate)
150     return d
151
152
153 def for_items(cb, mapping):
154     """
155     For each (key, value) pair in a mapping, I add a callback to cb(None, key, value)
156     to a Deferred that fires immediately. I return that Deferred.
157     """
158     d = defer.succeed(None)
159     for k, v in mapping.items():
160         d.addCallback(lambda ign, k=k, v=v: cb(None, k, v))
161     return d
162
163
164 class WaitForDelayedCallsMixin(PollMixin):
165     def _delayed_calls_done(self):
166         # We're done when the only remaining DelayedCalls fire after threshold.
167         # (These will be associated with the test timeout, or else they *should*
168         # cause an unclean reactor error because the test should have waited for
169         # them.)
170         threshold = time.time() + 10
171         for delayed in reactor.getDelayedCalls():
172             if delayed.getTime() < threshold:
173                 return False
174         return True
175
176     def wait_for_delayed_calls(self, res=None):
177         """
178         Use like this at the end of a test:
179           d.addBoth(self.wait_for_delayed_calls)
180         """
181         d = self.poll(self._delayed_calls_done)
182         d.addErrback(log.err, "error while waiting for delayed calls")
183         d.addBoth(lambda ign: res)
184         return d