]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/common_util.py
Fix tests on platforms without time.tzset (e.g. Windows). fixes ticket:2725
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / common_util.py
index 27d6b434ba4f49e9a21c01e70838e47757ed0e1f..c95f71681ff10dca701bcb4d238aaeec02abfe63 100644 (file)
@@ -1,9 +1,13 @@
-import os, signal, time
+import os, signal, sys, time
 from random import randrange
 
 from twisted.internet import reactor, defer
 from twisted.python import failure
 
+from allmydata.util import fileutil, log
+from allmydata.util.encodingutil import unicode_platform, get_filesystem_encoding
+
+
 def insecurerandstr(n):
     return ''.join(map(chr, map(randrange, [0]*n, [256]*n)))
 
@@ -25,13 +29,48 @@ def flip_one_bit(s, offset=0, size=None):
     assert result != s, "Internal error -- flip_one_bit() produced the same string as its input: %s == %s" % (result, s)
     return result
 
+
+class ReallyEqualMixin:
+    def failUnlessReallyEqual(self, a, b, msg=None):
+        self.failUnlessEqual(a, b, msg=msg)
+        self.failUnlessEqual(type(a), type(b), msg="a :: %r, b :: %r, %r" % (a, b, msg))
+
+
+class NonASCIIPathMixin:
+    def mkdir_nonascii(self, dirpath):
+        # Kludge to work around the fact that buildbot can't remove a directory tree that has
+        # any non-ASCII directory names on Windows. (#1472)
+        if sys.platform == "win32":
+            def _cleanup():
+                try:
+                    fileutil.rm_dir(dirpath)
+                finally:
+                    if os.path.exists(dirpath):
+                        msg = ("We were unable to delete a non-ASCII directory %r created by the test. "
+                               "This is liable to cause failures on future builds." % (dirpath,))
+                        print msg
+                        log.err(msg)
+            self.addCleanup(_cleanup)
+        os.mkdir(dirpath)
+
+    def unicode_or_fallback(self, unicode_name, fallback_name):
+        if unicode_platform():
+            return unicode_name
+        try:
+            unicode_name.encode(get_filesystem_encoding())
+            return unicode_name
+        except UnicodeEncodeError:
+            return fallback_name
+
+
 class SignalMixin:
     # This class is necessary for any code which wants to use Processes
     # outside the usual reactor.run() environment. It is copied from
-    # Twisted's twisted.test.test_process
+    # Twisted's twisted.test.test_process . Note that Twisted-8.2.0 uses
+    # something rather different.
     sigchldHandler = None
 
-    def setUpClass(self):
+    def setUp(self):
         # make sure SIGCHLD handler is installed, as it should be on
         # reactor.run(). problem is reactor may not have been run when this
         # test runs.
@@ -39,7 +78,7 @@ class SignalMixin:
             self.sigchldHandler = signal.signal(signal.SIGCHLD,
                                                 reactor._handleSigchld)
 
-    def tearDownClass(self):
+    def tearDown(self):
         if self.sigchldHandler:
             signal.signal(signal.SIGCHLD, self.sigchldHandler)
 
@@ -79,6 +118,7 @@ class TestMixin(SignalMixin):
             to without access to real randomness and real time.time from the
             code under test
         """
+        SignalMixin.setUp(self)
         self.repeatable = repeatable
         if self.repeatable:
             import repeatable_random
@@ -89,6 +129,7 @@ class TestMixin(SignalMixin):
             self.teststarttime = time.time()
 
     def tearDown(self):
+        SignalMixin.tearDown(self)
         if self.repeatable:
             import repeatable_random
             repeatable_random.restore_non_repeatability()
@@ -128,10 +169,36 @@ class TestMixin(SignalMixin):
             if p.active():
                 p.cancel()
             else:
-                print "WEIRNESS! pending timed call not active+!"
+                print "WEIRDNESS! pending timed call not active!"
         if required_to_quiesce and active:
             self.fail("Reactor was still active when it was required to be quiescent.")
 
+
+class TimezoneMixin(object):
+
+    def setTimezone(self, timezone):
+        def tzset_if_possible():
+            # Windows doesn't have time.tzset().
+            if hasattr(time, 'tzset'):
+                time.tzset()
+
+        unset = object()
+        originalTimezone = os.environ.get('TZ', unset)
+        def restoreTimezone():
+            if originalTimezone is unset:
+                del os.environ['TZ']
+            else:
+                os.environ['TZ'] = originalTimezone
+            tzset_if_possible()
+
+        os.environ['TZ'] = timezone
+        self.addCleanup(restoreTimezone)
+        tzset_if_possible()
+
+    def have_working_tzset(self):
+        return hasattr(time, 'tzset')
+
+
 try:
     import win32file
     import win32con