workqueue: start adding tests
authorBrian Warner <warner@allmydata.com>
Tue, 9 Jan 2007 04:29:33 +0000 (21:29 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 9 Jan 2007 04:29:33 +0000 (21:29 -0700)
src/allmydata/test/test_workqueue.py [new file with mode: 0644]
src/allmydata/workqueue.py

diff --git a/src/allmydata/test/test_workqueue.py b/src/allmydata/test/test_workqueue.py
new file mode 100644 (file)
index 0000000..6c19a5e
--- /dev/null
@@ -0,0 +1,53 @@
+
+import os
+from twisted.trial import unittest
+from allmydata import workqueue
+from allmydata.util import idlib
+
+class FakeWorkQueue(workqueue.WorkQueue):
+
+    def __init__(self, basedir):
+        workqueue.WorkQueue.__init__(self, basedir)
+        self.dispatched_steps = []
+
+    def dispatch_step(self, steptype, lines):
+        self.dispatched_steps.append(steptype, lines)
+
+class Items(unittest.TestCase):
+    def wq(self, testname):
+        return FakeWorkQueue("test_workqueue/Items/%s/workqueue" % testname)
+    def testTempfile(self):
+        wq = self.wq("testTempfile")
+        (f, filename) = wq.create_tempfile(".chkdir")
+        self.failUnless(filename.endswith(".chkdir"))
+        data = "this is some random data: %s\n" % idlib.b2a(os.urandom(15))
+        f.write(data)
+        f.close()
+        f2 = wq.open_tempfile(filename)
+        data2 = f2.read()
+        f2.close()
+        self.failUnlessEqual(data, data2)
+
+    def testCHK(self):
+        wq = self.wq("testCHK")
+        wq.add_upload_chk("source_filename", "box1")
+        wq.add_retain_uri_from_box("box1")
+        wq.add_addpath("box1", ["home", "warner", "foo.txt"])
+        wq.add_delete_box("box1")
+        wq.add_unlink_uri("olduri")
+
+        self.failUnlessEqual(wq.count_pending_steps(), 5)
+        stepname, steptype, lines = wq.get_next_step()
+        self.failUnlessEqual(steptype, "upload_chk")
+        steps = wq.get_all_steps()
+        self.failUnlessEqual(steps[0], ("upload_chk",
+                                        ["source_filename", "box1"]))
+        self.failUnlessEqual(steps[1], ("retain_uri_from_box",
+                                        ["box1"]))
+        self.failUnlessEqual(steps[2], ("addpath",
+                                        ["box1", "home", "warner", "foo.txt"]))
+        self.failUnlessEqual(steps[3], ("delete_box",
+                                        ["box1"]))
+        self.failUnlessEqual(steps[4], ("unlink_uri",
+                                        ["olduri"]))
+
index 13faf709973f4d55ba808a809cb1502a96544d3b..d943a8d7951a040f48e835feaaa74edc3938212d 100644 (file)
@@ -145,7 +145,7 @@ class WorkQueue(object):
         # either 'first' or 'last'. These steps are to be executed in
         # alphabetical order, with all 'step-first-NNN' steps running before
         # any 'step-last-NNN'.
-        for n in os.list(self.basedir):
+        for n in os.listdir(self.basedir):
             if n.startswith("step-first-"):
                 sn = int(n[len("step-first-"):])
                 self.seqnum = max(self.seqnum, sn)
@@ -157,13 +157,13 @@ class WorkQueue(object):
         assert self.seqnum < 1000 # TODO: don't let this grow unboundedly
 
     def create_tempfile(self, suffix=""):
-        randomname = b2a(os.random(10))
+        randomname = b2a(os.urandom(10))
         filename = randomname + suffix
         f = open(os.path.join(self.filesdir, filename), "wb")
         return (f, filename)
 
     def create_boxname(self):
-        return b2a(os.random(10))
+        return b2a(os.urandom(10))
     def write_to_box(self, boxname, data):
         f = open(os.path.join(self.boxesdir, boxname), "w")
         f.write(data)
@@ -192,51 +192,56 @@ class WorkQueue(object):
         tofile = os.path.join(self.basedir, filename)
         os.rename(fromfile, tofile)
 
+    def _create_step_first(self, lines):
+        self._create_step("first", lines)
+    def _create_step_last(self, lines):
+        self._create_step("last", lines)
+
     # methods to add entries to the queue
     def add_upload_chk(self, source_filename, stash_uri_in_boxname):
         # source_filename is absolute, and can point to things outside our
         # workqueue.
         lines = ["upload_chk", source_filename, stash_uri_in_boxname]
-        self._create_step(lines)
+        self._create_step_first(lines)
 
     def add_upload_ssk(self, source_filename, write_capability,
                        previous_version):
         lines = ["upload_ssk", source_filename,
                  b2a(write_capability.index), b2a(write_capability.key),
                  str(previous_version)]
-        self._create_step(lines)
+        self._create_step_first(lines)
 
     def add_retain_ssk(self, read_capability):
         lines = ["retain_ssk", b2a(read_capability.index),
                  b2a(read_capability.key)]
-        self._create_step(lines)
+        self._create_step_first(lines)
 
     def add_unlink_ssk(self, write_capability):
         lines = ["unlink_ssk", b2a(write_capability.index),
                  b2a(write_capability.key)]
-        self._create_step(lines)
+        self._create_step_last(lines)
 
     def add_retain_uri_from_box(self, boxname):
         lines = ["retain_uri_from_box", boxname]
-        self._create_step(lines)
+        self._create_step_first(lines)
 
     def add_addpath(self, boxname, path):
         assert isinstance(path, (list, tuple))
         lines = ["addpath", boxname]
         lines.extend(path)
-        self._create_step(lines)
+        self._create_step_first(lines)
 
     def add_unlink_uri(self, uri):
         lines = ["unlink_uri", uri]
-        self._create_step(lines)
+        self._create_step_last(lines)
 
-    def delete_tempfile(self, filename):
+    def add_delete_tempfile(self, filename):
         lines = ["delete_tempfile", filename]
-        self._create_step(lines)
+        self._create_step_first(lines)
 
-    def delete_box(self, boxname):
+    def add_delete_box(self, boxname):
         lines = ["delete_box", boxname]
-        self._create_step(lines)
+        self._create_step_first(lines)
 
 
     # methods to perform work
@@ -245,44 +250,66 @@ class WorkQueue(object):
         """Run the next pending step.
 
         Returns None if there is no next step to run, or a Deferred that
-        will fire when the step completes."""
+        will fire when the step completes. The step will be removed
+        from the queue when it completes."""
         next_step = self.get_next_step()
         if next_step:
             stepname, steptype, lines = self.get_next_step()
-            return self.dispatch_step(stepname, steptype, lines)
+            d = self.dispatch_step(steptype, lines)
+            d.addCallback(self._delete_step, stepname)
+            return d
         return None
 
     def get_next_step(self):
-        stepnames = [n for n in os.list(self.basedir)
+        stepnames = [n for n in os.listdir(self.basedir)
                      if n.startswith("step-")]
         stepnames.sort()
         if not stepnames:
             return None
         stepname = stepnames[0]
+        return self._get_step(stepname)
+
+    def _get_step(self, stepname):
         f = open(os.path.join(self.basedir, stepname), "r")
         lines = f.read().split("\n")
         f.close()
-        assert lines[-1] == ""
-        lines.pop(-1)
-        steptype = lines[0]
+        assert lines[-1] == "" # files should end with a newline
+        lines.pop(-1) # remove the newline
+        steptype = lines.pop(0)
         return stepname, steptype, lines
 
-    def dispatch_step(self, stepname, steptype, lines):
+    def dispatch_step(self, steptype, lines):
         handlername = "step_" + steptype
         if not hasattr(self, handlername):
             raise RuntimeError("unknown workqueue step type '%s'" % steptype)
         handler = getattr(self, handlername)
         d = defer.maybeDeferred(handler, *lines[1:])
-        d.addCallback(self._done, stepname)
         return d
 
-    def _done(self, res, stepname):
+    def _delete_step(self, res, stepname):
         os.unlink(os.path.join(self.basedir, stepname))
         return res
 
+    # debug/test methods
     def count_pending_steps(self):
-        return len([n for n in os.list(self.basedir)
+        return len([n for n in os.listdir(self.basedir)
                     if n.startswith("step-")])
+    def get_all_steps(self):
+        # returns a list of (steptype, lines) for all steps
+        steps = []
+        for stepname in os.listdir(self.basedir):
+            if stepname.startswith("step-"):
+                steps.append(self._get_step(stepname)[1:])
+        return steps
+
+
+    def open_tempfile(self, filename):
+        f = open(os.path.join(self.filesdir, filename), "rb")
+        return f
+
+    # work is dispatched to these methods. To add a new step type, add a
+    # dispatch method here and an add_ method above.
+
 
     def step_upload_chk(self, source_filename, index_a, write_key_a):
         pass