]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
copy testutil from pyutil
authorZooko O'Whielacronx <zooko@zooko.com>
Thu, 19 Apr 2007 00:28:36 +0000 (17:28 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Thu, 19 Apr 2007 00:28:36 +0000 (17:28 -0700)
src/allmydata/util/testutil.py [new file with mode: 0644]

diff --git a/src/allmydata/util/testutil.py b/src/allmydata/util/testutil.py
new file mode 100644 (file)
index 0000000..72595c1
--- /dev/null
@@ -0,0 +1,98 @@
+import os, signal, sys, time
+
+from twisted.internet import reactor
+from twisted.trial import unittest
+
+class SignalMixin(unittest.TestCase):
+    # This class is necessary for any code which wants to use Processes
+    # outside the usual reactor.run() environment. It is copied from
+    # Twisted's twisted.test.test_process
+    sigchldHandler = None
+    
+    def setUpClass(self):
+        # make sure SIGCHLD handler is installed, as it should be on
+        # reactor.run(). problem is reactor may not have been run when this
+        # test runs.
+        if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
+            self.sigchldHandler = signal.signal(signal.SIGCHLD,
+                                                reactor._handleSigchld)
+    
+    def tearDownClass(self):
+        if self.sigchldHandler:
+            signal.signal(signal.SIGCHLD, self.sigchldHandler)
+
+class TestMixin(SignalMixin):
+    def setUp(self, repeatable=False):
+        """
+        @param repeatable: install the repeatable_randomness hacks to attempt
+            to without access to real randomness and real time.time from the
+            code under test
+        """
+        self.repeatable = repeatable
+        if self.repeatable:
+            import repeatable_random
+            repeatable_random.force_repeatability()
+        if hasattr(time, 'realtime'):
+            self.teststarttime = time.realtime()
+        else:
+            self.teststarttime = time.time()
+
+    def tearDown(self):
+        if self.repeatable:
+            repeatable_random.restore_non_repeatability()
+        self.clean_pending(required_to_quiesce=True)
+
+    def clean_pending(self, dummy=None, required_to_quiesce=True):
+        """
+        This handy method cleans all pending tasks from the reactor.
+
+        When writing a unit test, consider the following question:
+
+            Is the code that you are testing required to release control once it
+            has done its job, so that it is impossible for it to later come around
+            (with a delayed reactor task) and do anything further?
+
+        If so, then trial will usefully test that for you -- if the code under
+        test leaves any pending tasks on the reactor then trial will fail it.
+
+        On the other hand, some code is *not* required to release control -- some
+        code is allowed to continuously maintain control by rescheduling reactor
+        tasks in order to do ongoing work.  Trial will incorrectly require that
+        code to clean up all its tasks from the reactor.
+
+        Most people think that such code should be amended to have an optional
+        "shutdown" operation that releases all control, but on the contrary it is
+        good design for some code to *not* have a shutdown operation, but instead
+        to have a "crash-only" design in which it recovers from crash on startup.
+
+        If the code under test is of the "long-running" kind, which is *not*
+        required to shutdown cleanly in order to pass tests, then you can simply
+        call testutil.clean_pending() at the end of the unit test, and trial will
+        be satisfied.
+        """
+        pending = reactor.getDelayedCalls()
+        active = bool(pending)
+        for p in pending:
+            if p.active():
+                p.cancel()
+            else:
+                print "WEIRNESS! pending timed call not active+!"
+        if required_to_quiesce and active:
+            self.fail("Reactor was still active when it was required to be quiescent.")
+
+if sys.platform == 'win32':
+    import win32file
+    import win32con
+    def make_readonly(path):
+        win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_READONLY)
+    def make_accessible(path):
+        win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_NORMAL)
+else:
+    import stat
+    def make_readonly(path):
+        os.chmod(path, stat.S_IREAD)
+        os.chmod(os.path.dirname(path), stat.S_IREAD)
+    def make_accessible(path):
+        os.chmod(os.path.dirname(path), stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
+        os.chmod(path, stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
+