]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/common_util.py
Fix for Unicode-related test failures on Zooko's OS X 10.6 machine.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / common_util.py
1 import os, signal, time
2 from random import randrange
3
4 from twisted.internet import reactor, defer
5 from twisted.python import failure
6
7 def insecurerandstr(n):
8     return ''.join(map(chr, map(randrange, [0]*n, [256]*n)))
9
10 def flip_bit(good, which):
11     # flip the low-order bit of good[which]
12     if which == -1:
13         pieces = good[:which], good[-1:], ""
14     else:
15         pieces = good[:which], good[which:which+1], good[which+1:]
16     return pieces[0] + chr(ord(pieces[1]) ^ 0x01) + pieces[2]
17
18 def flip_one_bit(s, offset=0, size=None):
19     """ flip one random bit of the string s, in a byte greater than or equal to offset and less
20     than offset+size. """
21     if size is None:
22         size=len(s)-offset
23     i = randrange(offset, offset+size)
24     result = s[:i] + chr(ord(s[i])^(0x01<<randrange(0, 8))) + s[i+1:]
25     assert result != s, "Internal error -- flip_one_bit() produced the same string as its input: %s == %s" % (result, s)
26     return result
27
28
29 class ReallyEqualMixin:
30     def failUnlessReallyEqual(self, a, b, msg=None):
31         self.failUnlessEqual(a, b, msg=msg)
32         self.failUnlessEqual(type(a), type(b), msg="a :: %r, b :: %r, %r" % (a, b, msg))
33
34
35 class SignalMixin:
36     # This class is necessary for any code which wants to use Processes
37     # outside the usual reactor.run() environment. It is copied from
38     # Twisted's twisted.test.test_process . Note that Twisted-8.2.0 uses
39     # something rather different.
40     sigchldHandler = None
41
42     def setUp(self):
43         # make sure SIGCHLD handler is installed, as it should be on
44         # reactor.run(). problem is reactor may not have been run when this
45         # test runs.
46         if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
47             self.sigchldHandler = signal.signal(signal.SIGCHLD,
48                                                 reactor._handleSigchld)
49
50     def tearDown(self):
51         if self.sigchldHandler:
52             signal.signal(signal.SIGCHLD, self.sigchldHandler)
53
54 class StallMixin:
55     def stall(self, res=None, delay=1):
56         d = defer.Deferred()
57         reactor.callLater(delay, d.callback, res)
58         return d
59
60 class ShouldFailMixin:
61
62     def shouldFail(self, expected_failure, which, substring,
63                    callable, *args, **kwargs):
64         assert substring is None or isinstance(substring, str)
65         d = defer.maybeDeferred(callable, *args, **kwargs)
66         def done(res):
67             if isinstance(res, failure.Failure):
68                 res.trap(expected_failure)
69                 if substring:
70                     self.failUnless(substring in str(res),
71                                     "%s: substring '%s' not in '%s'"
72                                     % (which, substring, str(res)))
73                 # return the Failure for further analysis, but in a form that
74                 # doesn't make the Deferred chain think that we failed.
75                 return [res]
76             else:
77                 self.fail("%s was supposed to raise %s, not get '%s'" %
78                           (which, expected_failure, res))
79         d.addBoth(done)
80         return d
81
82
83 class TestMixin(SignalMixin):
84     def setUp(self, repeatable=False):
85         """
86         @param repeatable: install the repeatable_randomness hacks to attempt
87             to without access to real randomness and real time.time from the
88             code under test
89         """
90         SignalMixin.setUp(self)
91         self.repeatable = repeatable
92         if self.repeatable:
93             import repeatable_random
94             repeatable_random.force_repeatability()
95         if hasattr(time, 'realtime'):
96             self.teststarttime = time.realtime()
97         else:
98             self.teststarttime = time.time()
99
100     def tearDown(self):
101         SignalMixin.tearDown(self)
102         if self.repeatable:
103             import repeatable_random
104             repeatable_random.restore_non_repeatability()
105         self.clean_pending(required_to_quiesce=True)
106
107     def clean_pending(self, dummy=None, required_to_quiesce=True):
108         """
109         This handy method cleans all pending tasks from the reactor.
110
111         When writing a unit test, consider the following question:
112
113             Is the code that you are testing required to release control once it
114             has done its job, so that it is impossible for it to later come around
115             (with a delayed reactor task) and do anything further?
116
117         If so, then trial will usefully test that for you -- if the code under
118         test leaves any pending tasks on the reactor then trial will fail it.
119
120         On the other hand, some code is *not* required to release control -- some
121         code is allowed to continuously maintain control by rescheduling reactor
122         tasks in order to do ongoing work.  Trial will incorrectly require that
123         code to clean up all its tasks from the reactor.
124
125         Most people think that such code should be amended to have an optional
126         "shutdown" operation that releases all control, but on the contrary it is
127         good design for some code to *not* have a shutdown operation, but instead
128         to have a "crash-only" design in which it recovers from crash on startup.
129
130         If the code under test is of the "long-running" kind, which is *not*
131         required to shutdown cleanly in order to pass tests, then you can simply
132         call testutil.clean_pending() at the end of the unit test, and trial will
133         be satisfied.
134         """
135         pending = reactor.getDelayedCalls()
136         active = bool(pending)
137         for p in pending:
138             if p.active():
139                 p.cancel()
140             else:
141                 print "WEIRDNESS! pending timed call not active!"
142         if required_to_quiesce and active:
143             self.fail("Reactor was still active when it was required to be quiescent.")
144
145 try:
146     import win32file
147     import win32con
148     def make_readonly(path):
149         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_READONLY)
150     def make_accessible(path):
151         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_NORMAL)
152 except ImportError:
153     import stat
154     def _make_readonly(path):
155         os.chmod(path, stat.S_IREAD)
156         os.chmod(os.path.dirname(path), stat.S_IREAD)
157     def _make_accessible(path):
158         os.chmod(os.path.dirname(path), stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
159         os.chmod(path, stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
160     make_readonly = _make_readonly
161     make_accessible = _make_accessible