]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/common_util.py
Fix tests on platforms without time.tzset (e.g. Windows). fixes ticket:2725
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / common_util.py
1 import os, signal, sys, time
2 from random import randrange
3
4 from twisted.internet import reactor, defer
5 from twisted.python import failure
6
7 from allmydata.util import fileutil, log
8 from allmydata.util.encodingutil import unicode_platform, get_filesystem_encoding
9
10
11 def insecurerandstr(n):
12     return ''.join(map(chr, map(randrange, [0]*n, [256]*n)))
13
14 def flip_bit(good, which):
15     # flip the low-order bit of good[which]
16     if which == -1:
17         pieces = good[:which], good[-1:], ""
18     else:
19         pieces = good[:which], good[which:which+1], good[which+1:]
20     return pieces[0] + chr(ord(pieces[1]) ^ 0x01) + pieces[2]
21
22 def flip_one_bit(s, offset=0, size=None):
23     """ flip one random bit of the string s, in a byte greater than or equal to offset and less
24     than offset+size. """
25     if size is None:
26         size=len(s)-offset
27     i = randrange(offset, offset+size)
28     result = s[:i] + chr(ord(s[i])^(0x01<<randrange(0, 8))) + s[i+1:]
29     assert result != s, "Internal error -- flip_one_bit() produced the same string as its input: %s == %s" % (result, s)
30     return result
31
32
33 class ReallyEqualMixin:
34     def failUnlessReallyEqual(self, a, b, msg=None):
35         self.failUnlessEqual(a, b, msg=msg)
36         self.failUnlessEqual(type(a), type(b), msg="a :: %r, b :: %r, %r" % (a, b, msg))
37
38
39 class NonASCIIPathMixin:
40     def mkdir_nonascii(self, dirpath):
41         # Kludge to work around the fact that buildbot can't remove a directory tree that has
42         # any non-ASCII directory names on Windows. (#1472)
43         if sys.platform == "win32":
44             def _cleanup():
45                 try:
46                     fileutil.rm_dir(dirpath)
47                 finally:
48                     if os.path.exists(dirpath):
49                         msg = ("We were unable to delete a non-ASCII directory %r created by the test. "
50                                "This is liable to cause failures on future builds." % (dirpath,))
51                         print msg
52                         log.err(msg)
53             self.addCleanup(_cleanup)
54         os.mkdir(dirpath)
55
56     def unicode_or_fallback(self, unicode_name, fallback_name):
57         if unicode_platform():
58             return unicode_name
59         try:
60             unicode_name.encode(get_filesystem_encoding())
61             return unicode_name
62         except UnicodeEncodeError:
63             return fallback_name
64
65
66 class SignalMixin:
67     # This class is necessary for any code which wants to use Processes
68     # outside the usual reactor.run() environment. It is copied from
69     # Twisted's twisted.test.test_process . Note that Twisted-8.2.0 uses
70     # something rather different.
71     sigchldHandler = None
72
73     def setUp(self):
74         # make sure SIGCHLD handler is installed, as it should be on
75         # reactor.run(). problem is reactor may not have been run when this
76         # test runs.
77         if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
78             self.sigchldHandler = signal.signal(signal.SIGCHLD,
79                                                 reactor._handleSigchld)
80
81     def tearDown(self):
82         if self.sigchldHandler:
83             signal.signal(signal.SIGCHLD, self.sigchldHandler)
84
85 class StallMixin:
86     def stall(self, res=None, delay=1):
87         d = defer.Deferred()
88         reactor.callLater(delay, d.callback, res)
89         return d
90
91 class ShouldFailMixin:
92
93     def shouldFail(self, expected_failure, which, substring,
94                    callable, *args, **kwargs):
95         assert substring is None or isinstance(substring, str)
96         d = defer.maybeDeferred(callable, *args, **kwargs)
97         def done(res):
98             if isinstance(res, failure.Failure):
99                 res.trap(expected_failure)
100                 if substring:
101                     self.failUnless(substring in str(res),
102                                     "%s: substring '%s' not in '%s'"
103                                     % (which, substring, str(res)))
104                 # return the Failure for further analysis, but in a form that
105                 # doesn't make the Deferred chain think that we failed.
106                 return [res]
107             else:
108                 self.fail("%s was supposed to raise %s, not get '%s'" %
109                           (which, expected_failure, res))
110         d.addBoth(done)
111         return d
112
113
114 class TestMixin(SignalMixin):
115     def setUp(self, repeatable=False):
116         """
117         @param repeatable: install the repeatable_randomness hacks to attempt
118             to without access to real randomness and real time.time from the
119             code under test
120         """
121         SignalMixin.setUp(self)
122         self.repeatable = repeatable
123         if self.repeatable:
124             import repeatable_random
125             repeatable_random.force_repeatability()
126         if hasattr(time, 'realtime'):
127             self.teststarttime = time.realtime()
128         else:
129             self.teststarttime = time.time()
130
131     def tearDown(self):
132         SignalMixin.tearDown(self)
133         if self.repeatable:
134             import repeatable_random
135             repeatable_random.restore_non_repeatability()
136         self.clean_pending(required_to_quiesce=True)
137
138     def clean_pending(self, dummy=None, required_to_quiesce=True):
139         """
140         This handy method cleans all pending tasks from the reactor.
141
142         When writing a unit test, consider the following question:
143
144             Is the code that you are testing required to release control once it
145             has done its job, so that it is impossible for it to later come around
146             (with a delayed reactor task) and do anything further?
147
148         If so, then trial will usefully test that for you -- if the code under
149         test leaves any pending tasks on the reactor then trial will fail it.
150
151         On the other hand, some code is *not* required to release control -- some
152         code is allowed to continuously maintain control by rescheduling reactor
153         tasks in order to do ongoing work.  Trial will incorrectly require that
154         code to clean up all its tasks from the reactor.
155
156         Most people think that such code should be amended to have an optional
157         "shutdown" operation that releases all control, but on the contrary it is
158         good design for some code to *not* have a shutdown operation, but instead
159         to have a "crash-only" design in which it recovers from crash on startup.
160
161         If the code under test is of the "long-running" kind, which is *not*
162         required to shutdown cleanly in order to pass tests, then you can simply
163         call testutil.clean_pending() at the end of the unit test, and trial will
164         be satisfied.
165         """
166         pending = reactor.getDelayedCalls()
167         active = bool(pending)
168         for p in pending:
169             if p.active():
170                 p.cancel()
171             else:
172                 print "WEIRDNESS! pending timed call not active!"
173         if required_to_quiesce and active:
174             self.fail("Reactor was still active when it was required to be quiescent.")
175
176
177 class TimezoneMixin(object):
178
179     def setTimezone(self, timezone):
180         def tzset_if_possible():
181             # Windows doesn't have time.tzset().
182             if hasattr(time, 'tzset'):
183                 time.tzset()
184
185         unset = object()
186         originalTimezone = os.environ.get('TZ', unset)
187         def restoreTimezone():
188             if originalTimezone is unset:
189                 del os.environ['TZ']
190             else:
191                 os.environ['TZ'] = originalTimezone
192             tzset_if_possible()
193
194         os.environ['TZ'] = timezone
195         self.addCleanup(restoreTimezone)
196         tzset_if_possible()
197
198     def have_working_tzset(self):
199         return hasattr(time, 'tzset')
200
201
202 try:
203     import win32file
204     import win32con
205     def make_readonly(path):
206         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_READONLY)
207     def make_accessible(path):
208         win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_NORMAL)
209 except ImportError:
210     import stat
211     def _make_readonly(path):
212         os.chmod(path, stat.S_IREAD)
213         os.chmod(os.path.dirname(path), stat.S_IREAD)
214     def _make_accessible(path):
215         os.chmod(os.path.dirname(path), stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
216         os.chmod(path, stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
217     make_readonly = _make_readonly
218     make_accessible = _make_accessible