]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/common_util.py
a3a190c0dda94e3b0c379dac12c16da59b0e2681
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / common_util.py
1 import os, signal, sys, time
2 from random import randrange
3
4 from twisted.internet import reactor, defer
5 from twisted.python import failure
6
7 from allmydata.util import fileutil, log
8 from allmydata.util.encodingutil import unicode_platform, get_filesystem_encoding
9
10
11 def insecurerandstr(n):
12     return ''.join(map(chr, map(randrange, [0]*n, [256]*n)))
13
14 def flip_bit(good, which):
15     # flip the low-order bit of good[which]
16     if which == -1:
17         pieces = good[:which], good[-1:], ""
18     else:
19         pieces = good[:which], good[which:which+1], good[which+1:]
20     return pieces[0] + chr(ord(pieces[1]) ^ 0x01) + pieces[2]
21
22 def flip_one_bit(s, offset=0, size=None):
23     """ flip one random bit of the string s, in a byte greater than or equal to offset and less
24     than offset+size. """
25     if size is None:
26         size=len(s)-offset
27     i = randrange(offset, offset+size)
28     result = s[:i] + chr(ord(s[i])^(0x01<<randrange(0, 8))) + s[i+1:]
29     assert result != s, "Internal error -- flip_one_bit() produced the same string as its input: %s == %s" % (result, s)
30     return result
31
32
33 class ReallyEqualMixin:
34     def failUnlessReallyEqual(self, a, b, msg=None):
35         self.failUnlessEqual(a, b, msg=msg)
36         self.failUnlessEqual(type(a), type(b), msg="a :: %r, b :: %r, %r" % (a, b, msg))
37
38
39 class NonASCIIPathMixin:
40     def mkdir_nonascii(self, dirpath):
41         # Kludge to work around the fact that buildbot can't remove a directory tree that has
42         # any non-ASCII directory names on Windows. (#1472)
43         if sys.platform == "win32":
44             def _cleanup():
45                 try:
46                     fileutil.rm_dir(dirpath)
47                 finally:
48                     log.err("We were unable to delete a non-ASCII directory %r created by the test. "
49                             "This is liable to cause failures on future builds." % (dirpath,))
50             self.addCleanup(_cleanup)
51         os.mkdir(dirpath)
52
53     def unicode_or_fallback(self, unicode_name, fallback_name):
54         if unicode_platform():
55             return unicode_name
56         try:
57             unicode_name.encode(get_filesystem_encoding())
58             return unicode_name
59         except UnicodeEncodeError:
60             return fallback_name
61
62
63 class SignalMixin:
64     # This class is necessary for any code which wants to use Processes
65     # outside the usual reactor.run() environment. It is copied from
66     # Twisted's twisted.test.test_process . Note that Twisted-8.2.0 uses
67     # something rather different.
68     sigchldHandler = None
69
70     def setUp(self):
71         # make sure SIGCHLD handler is installed, as it should be on
72         # reactor.run(). problem is reactor may not have been run when this
73         # test runs.
74         if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
75             self.sigchldHandler = signal.signal(signal.SIGCHLD,
76                                                 reactor._handleSigchld)
77
78     def tearDown(self):
79         if self.sigchldHandler:
80             signal.signal(signal.SIGCHLD, self.sigchldHandler)
81
82 class StallMixin:
83     def stall(self, res=None, delay=1):
84         d = defer.Deferred()
85         reactor.callLater(delay, d.callback, res)
86         return d
87
88 class ShouldFailMixin:
89
90     def shouldFail(self, expected_failure, which, substring,
91                    callable, *args, **kwargs):
92         assert substring is None or isinstance(substring, str)
93         d = defer.maybeDeferred(callable, *args, **kwargs)
94         def done(res):
95             if isinstance(res, failure.Failure):
96                 res.trap(expected_failure)
97                 if substring:
98                     self.failUnless(substring in str(res),
99                                     "%s: substring '%s' not in '%s'"
100                                     % (which, substring, str(res)))
101                 # return the Failure for further analysis, but in a form that
102                 # doesn't make the Deferred chain think that we failed.
103                 return [res]
104             else:
105                 self.fail("%s was supposed to raise %s, not get '%s'" %
106                           (which, expected_failure, res))
107         d.addBoth(done)
108         return d
109
110
111 class TestMixin(SignalMixin):
112     def setUp(self, repeatable=False):
113         """
114         @param repeatable: install the repeatable_randomness hacks to attempt
115             to without access to real randomness and real time.time from the
116             code under test
117         """
118         SignalMixin.setUp(self)
119         self.repeatable = repeatable
120         if self.repeatable:
121             import repeatable_random
122             repeatable_random.force_repeatability()
123         if hasattr(time, 'realtime'):
124             self.teststarttime = time.realtime()
125         else:
126             self.teststarttime = time.time()
127
128     def tearDown(self):
129         SignalMixin.tearDown(self)
130         if self.repeatable:
131             import repeatable_random
132             repeatable_random.restore_non_repeatability()
133         self.clean_pending(required_to_quiesce=True)
134
135     def clean_pending(self, dummy=None, required_to_quiesce=True):
136         """
137         This handy method cleans all pending tasks from the reactor.
138
139         When writing a unit test, consider the following question:
140
141             Is the code that you are testing required to release control once it
142             has done its job, so that it is impossible for it to later come around
143             (with a delayed reactor task) and do anything further?
144
145         If so, then trial will usefully test that for you -- if the code under
146         test leaves any pending tasks on the reactor then trial will fail it.
147
148         On the other hand, some code is *not* required to release control -- some
149         code is allowed to continuously maintain control by rescheduling reactor
150         tasks in order to do ongoing work.  Trial will incorrectly require that
151         code to clean up all its tasks from the reactor.
152
153         Most people think that such code should be amended to have an optional
154         "shutdown" operation that releases all control, but on the contrary it is
155         good design for some code to *not* have a shutdown operation, but instead
156         to have a "crash-only" design in which it recovers from crash on startup.
157
158         If the code under test is of the "long-running" kind, which is *not*
159         required to shutdown cleanly in order to pass tests, then you can simply
160         call testutil.clean_pending() at the end of the unit test, and trial will
161         be satisfied.
162         """
163         pending = reactor.getDelayedCalls()
164         active = bool(pending)
165         for p in pending:
166             if p.active():
167                 p.cancel()
168             else:
169                 print "WEIRDNESS! pending timed call not active!"
170         if required_to_quiesce and active:
171             self.fail("Reactor was still active when it was required to be quiescent.")
172
173 try:
174     import win32file
175     import win32con
176     def make_readonly(path):
177         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_READONLY)
178     def make_accessible(path):
179         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_NORMAL)
180 except ImportError:
181     import stat
182     def _make_readonly(path):
183         os.chmod(path, stat.S_IREAD)
184         os.chmod(os.path.dirname(path), stat.S_IREAD)
185     def _make_accessible(path):
186         os.chmod(os.path.dirname(path), stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
187         os.chmod(path, stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
188     make_readonly = _make_readonly
189     make_accessible = _make_accessible