]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/util/testutil.py
PollMixin: add timeout= argument, rewrite to avoid tail-recursion problems
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / util / testutil.py
1 import os, signal, time
2
3 from twisted.internet import reactor, defer, task
4 from twisted.python import failure
5
6
7 def flip_bit(good, which):
8     # flip the low-order bit of good[which]
9     if which == -1:
10         pieces = good[:which], good[-1:], ""
11     else:
12         pieces = good[:which], good[which:which+1], good[which+1:]
13     return pieces[0] + chr(ord(pieces[1]) ^ 0x01) + pieces[2]
14
15 class SignalMixin:
16     # This class is necessary for any code which wants to use Processes
17     # outside the usual reactor.run() environment. It is copied from
18     # Twisted's twisted.test.test_process
19     sigchldHandler = None
20
21     def setUpClass(self):
22         # make sure SIGCHLD handler is installed, as it should be on
23         # reactor.run(). problem is reactor may not have been run when this
24         # test runs.
25         if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
26             self.sigchldHandler = signal.signal(signal.SIGCHLD,
27                                                 reactor._handleSigchld)
28
29     def tearDownClass(self):
30         if self.sigchldHandler:
31             signal.signal(signal.SIGCHLD, self.sigchldHandler)
32
33 class TimeoutError(Exception):
34     pass
35
36 class PollMixin:
37
38     def poll(self, check_f, pollinterval=0.01, timeout=None):
39         # Return a Deferred, then call check_f periodically until it returns
40         # True, at which point the Deferred will fire.. If check_f raises an
41         # exception, the Deferred will errback. If the check_f does not
42         # indicate success within timeout= seconds, the Deferred will
43         # errback. If timeout=None, no timeout will be enforced.
44         cutoff = None
45         if timeout is not None:
46             cutoff = time.time() + timeout
47         stash = [] # ick. We have to pass the LoopingCall into itself
48         lc = task.LoopingCall(self._poll, check_f, stash, cutoff)
49         stash.append(lc)
50         d = lc.start(pollinterval)
51         return d
52
53     def _poll(self, check_f, stash, cutoff):
54         if cutoff is not None and time.time() > cutoff:
55             raise TimeoutError()
56         lc = stash[0]
57         if check_f():
58             lc.stop()
59
60 class ShouldFailMixin:
61
62     def shouldFail(self, expected_failure, which, substring, callable, *args, **kwargs):
63         assert substring is None or isinstance(substring, str)
64         d = defer.maybeDeferred(callable, *args, **kwargs)
65         def done(res):
66             if isinstance(res, failure.Failure):
67                 res.trap(expected_failure)
68                 if substring:
69                     self.failUnless(substring in str(res),
70                                     "substring '%s' not in '%s'"
71                                     % (substring, str(res)))
72             else:
73                 self.fail("%s was supposed to raise %s, not get '%s'" %
74                           (which, expected_failure, res))
75         d.addBoth(done)
76         return d
77
78
79 class TestMixin(SignalMixin):
80     def setUp(self, repeatable=False):
81         """
82         @param repeatable: install the repeatable_randomness hacks to attempt
83             to without access to real randomness and real time.time from the
84             code under test
85         """
86         self.repeatable = repeatable
87         if self.repeatable:
88             import repeatable_random
89             repeatable_random.force_repeatability()
90         if hasattr(time, 'realtime'):
91             self.teststarttime = time.realtime()
92         else:
93             self.teststarttime = time.time()
94
95     def tearDown(self):
96         if self.repeatable:
97             import repeatable_random
98             repeatable_random.restore_non_repeatability()
99         self.clean_pending(required_to_quiesce=True)
100
101     def clean_pending(self, dummy=None, required_to_quiesce=True):
102         """
103         This handy method cleans all pending tasks from the reactor.
104
105         When writing a unit test, consider the following question:
106
107             Is the code that you are testing required to release control once it
108             has done its job, so that it is impossible for it to later come around
109             (with a delayed reactor task) and do anything further?
110
111         If so, then trial will usefully test that for you -- if the code under
112         test leaves any pending tasks on the reactor then trial will fail it.
113
114         On the other hand, some code is *not* required to release control -- some
115         code is allowed to continuously maintain control by rescheduling reactor
116         tasks in order to do ongoing work.  Trial will incorrectly require that
117         code to clean up all its tasks from the reactor.
118
119         Most people think that such code should be amended to have an optional
120         "shutdown" operation that releases all control, but on the contrary it is
121         good design for some code to *not* have a shutdown operation, but instead
122         to have a "crash-only" design in which it recovers from crash on startup.
123
124         If the code under test is of the "long-running" kind, which is *not*
125         required to shutdown cleanly in order to pass tests, then you can simply
126         call testutil.clean_pending() at the end of the unit test, and trial will
127         be satisfied.
128         """
129         pending = reactor.getDelayedCalls()
130         active = bool(pending)
131         for p in pending:
132             if p.active():
133                 p.cancel()
134             else:
135                 print "WEIRNESS! pending timed call not active+!"
136         if required_to_quiesce and active:
137             self.fail("Reactor was still active when it was required to be quiescent.")
138
139 try:
140     import win32file
141     import win32con
142     def make_readonly(path):
143         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_READONLY)
144     def make_accessible(path):
145         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_NORMAL)
146 except ImportError:
147     import stat
148     def _make_readonly(path):
149         os.chmod(path, stat.S_IREAD)
150         os.chmod(os.path.dirname(path), stat.S_IREAD)
151     def _make_accessible(path):
152         os.chmod(os.path.dirname(path), stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
153         os.chmod(path, stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
154     make_readonly = _make_readonly
155     make_accessible = _make_accessible