]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/common_util.py
tests: stop using setUpClass/tearDownClass, since they've been deprecated in Twisted...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / common_util.py
1 import os, signal, time
2 from random import randrange
3
4 from twisted.internet import reactor, defer
5 from twisted.python import failure
6
7 def insecurerandstr(n):
8     return ''.join(map(chr, map(randrange, [0]*n, [256]*n)))
9
10 def flip_bit(good, which):
11     # flip the low-order bit of good[which]
12     if which == -1:
13         pieces = good[:which], good[-1:], ""
14     else:
15         pieces = good[:which], good[which:which+1], good[which+1:]
16     return pieces[0] + chr(ord(pieces[1]) ^ 0x01) + pieces[2]
17
18 def flip_one_bit(s, offset=0, size=None):
19     """ flip one random bit of the string s, in a byte greater than or equal to offset and less
20     than offset+size. """
21     if size is None:
22         size=len(s)-offset
23     i = randrange(offset, offset+size)
24     result = s[:i] + chr(ord(s[i])^(0x01<<randrange(0, 8))) + s[i+1:]
25     assert result != s, "Internal error -- flip_one_bit() produced the same string as its input: %s == %s" % (result, s)
26     return result
27
28 class SignalMixin:
29     # This class is necessary for any code which wants to use Processes
30     # outside the usual reactor.run() environment. It is copied from
31     # Twisted's twisted.test.test_process . Note that Twisted-8.2.0 uses
32     # something rather different.
33     sigchldHandler = None
34
35     def setUp(self):
36         # make sure SIGCHLD handler is installed, as it should be on
37         # reactor.run(). problem is reactor may not have been run when this
38         # test runs.
39         if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
40             self.sigchldHandler = signal.signal(signal.SIGCHLD,
41                                                 reactor._handleSigchld)
42
43     def tearDown(self):
44         if self.sigchldHandler:
45             signal.signal(signal.SIGCHLD, self.sigchldHandler)
46
47 class StallMixin:
48     def stall(self, res=None, delay=1):
49         d = defer.Deferred()
50         reactor.callLater(delay, d.callback, res)
51         return d
52
53 class ShouldFailMixin:
54
55     def shouldFail(self, expected_failure, which, substring,
56                    callable, *args, **kwargs):
57         assert substring is None or isinstance(substring, str)
58         d = defer.maybeDeferred(callable, *args, **kwargs)
59         def done(res):
60             if isinstance(res, failure.Failure):
61                 res.trap(expected_failure)
62                 if substring:
63                     self.failUnless(substring in str(res),
64                                     "%s: substring '%s' not in '%s'"
65                                     % (which, substring, str(res)))
66                 # return the Failure for further analysis, but in a form that
67                 # doesn't make the Deferred chain think that we failed.
68                 return [res]
69             else:
70                 self.fail("%s was supposed to raise %s, not get '%s'" %
71                           (which, expected_failure, res))
72         d.addBoth(done)
73         return d
74
75
76 class TestMixin(SignalMixin):
77     def setUp(self, repeatable=False):
78         """
79         @param repeatable: install the repeatable_randomness hacks to attempt
80             to without access to real randomness and real time.time from the
81             code under test
82         """
83         SignalMixin.setUp(self)
84         self.repeatable = repeatable
85         if self.repeatable:
86             import repeatable_random
87             repeatable_random.force_repeatability()
88         if hasattr(time, 'realtime'):
89             self.teststarttime = time.realtime()
90         else:
91             self.teststarttime = time.time()
92
93     def tearDown(self):
94         SignalMixin.tearDown(self)
95         if self.repeatable:
96             import repeatable_random
97             repeatable_random.restore_non_repeatability()
98         self.clean_pending(required_to_quiesce=True)
99
100     def clean_pending(self, dummy=None, required_to_quiesce=True):
101         """
102         This handy method cleans all pending tasks from the reactor.
103
104         When writing a unit test, consider the following question:
105
106             Is the code that you are testing required to release control once it
107             has done its job, so that it is impossible for it to later come around
108             (with a delayed reactor task) and do anything further?
109
110         If so, then trial will usefully test that for you -- if the code under
111         test leaves any pending tasks on the reactor then trial will fail it.
112
113         On the other hand, some code is *not* required to release control -- some
114         code is allowed to continuously maintain control by rescheduling reactor
115         tasks in order to do ongoing work.  Trial will incorrectly require that
116         code to clean up all its tasks from the reactor.
117
118         Most people think that such code should be amended to have an optional
119         "shutdown" operation that releases all control, but on the contrary it is
120         good design for some code to *not* have a shutdown operation, but instead
121         to have a "crash-only" design in which it recovers from crash on startup.
122
123         If the code under test is of the "long-running" kind, which is *not*
124         required to shutdown cleanly in order to pass tests, then you can simply
125         call testutil.clean_pending() at the end of the unit test, and trial will
126         be satisfied.
127         """
128         pending = reactor.getDelayedCalls()
129         active = bool(pending)
130         for p in pending:
131             if p.active():
132                 p.cancel()
133             else:
134                 print "WEIRNESS! pending timed call not active+!"
135         if required_to_quiesce and active:
136             self.fail("Reactor was still active when it was required to be quiescent.")
137
138 try:
139     import win32file
140     import win32con
141     def make_readonly(path):
142         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_READONLY)
143     def make_accessible(path):
144         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_NORMAL)
145 except ImportError:
146     import stat
147     def _make_readonly(path):
148         os.chmod(path, stat.S_IREAD)
149         os.chmod(os.path.dirname(path), stat.S_IREAD)
150     def _make_accessible(path):
151         os.chmod(os.path.dirname(path), stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
152         os.chmod(path, stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
153     make_readonly = _make_readonly
154     make_accessible = _make_accessible