From b641f6cbc7b8f2ab46a62156d393c387af67cda4 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@allmydata.com>
Date: Mon, 8 Jan 2007 22:29:42 -0700
Subject: [PATCH] workqueue: more improvements, more tests

---
 src/allmydata/test/test_workqueue.py | 69 +++++++++++++++++++++++++++-
 src/allmydata/workqueue.py           | 28 ++++++++++-
 2 files changed, 94 insertions(+), 3 deletions(-)

diff --git a/src/allmydata/test/test_workqueue.py b/src/allmydata/test/test_workqueue.py
index 6c19a5e8..6ca36b21 100644
--- a/src/allmydata/test/test_workqueue.py
+++ b/src/allmydata/test/test_workqueue.py
@@ -1,6 +1,7 @@
 
 import os
 from twisted.trial import unittest
+from twisted.internet import defer
 from allmydata import workqueue
 from allmydata.util import idlib
 
@@ -11,11 +12,13 @@ class FakeWorkQueue(workqueue.WorkQueue):
         self.dispatched_steps = []
 
     def dispatch_step(self, steptype, lines):
-        self.dispatched_steps.append(steptype, lines)
+        self.dispatched_steps.append((steptype, lines))
+        return defer.succeed(None)
 
 class Items(unittest.TestCase):
     def wq(self, testname):
         return FakeWorkQueue("test_workqueue/Items/%s/workqueue" % testname)
+
     def testTempfile(self):
         wq = self.wq("testTempfile")
         (f, filename) = wq.create_tempfile(".chkdir")
@@ -51,3 +54,67 @@ class Items(unittest.TestCase):
         self.failUnlessEqual(steps[4], ("unlink_uri",
                                         ["olduri"]))
 
+    def testCHK2(self):
+        wq = self.wq("testCHK2")
+        wq.add_upload_chk("source_filename", "box1")
+        wq.add_retain_uri_from_box("box1")
+        wq.add_addpath("box1", ["home", "warner", "foo.txt"])
+        wq.add_delete_box("box1")
+        wq.add_unlink_uri("olduri")
+
+        # then this batch happens a bit later
+        (f, tmpfilename) = wq.create_tempfile(".chkdir")
+        f.write("some data")
+        f.close()
+        wq.add_upload_chk(os.path.join(wq.filesdir, tmpfilename), "box2")
+        wq.add_delete_tempfile(tmpfilename)
+        wq.add_retain_uri_from_box("box2")
+        wq.add_delete_box("box2")
+        wq.add_unlink_uri("oldchk")
+
+        self.failUnlessEqual(wq.count_pending_steps(), 10)
+        steps = wq.get_all_steps()
+
+        self.failUnlessEqual(steps[0], ("upload_chk",
+                                        ["source_filename", "box1"]))
+        self.failUnlessEqual(steps[1], ("retain_uri_from_box",
+                                        ["box1"]))
+        self.failUnlessEqual(steps[2], ("addpath",
+                                        ["box1", "home", "warner", "foo.txt"]))
+        self.failUnlessEqual(steps[3], ("delete_box",
+                                        ["box1"]))
+        self.failUnlessEqual(steps[4],
+                             ("upload_chk",
+                              [os.path.join(wq.filesdir, tmpfilename),
+                               "box2"]))
+        self.failUnlessEqual(steps[5],
+                             ("delete_tempfile", [tmpfilename]))
+        self.failUnlessEqual(steps[6],
+                             ("retain_uri_from_box", ["box2"]))
+        self.failUnlessEqual(steps[7], ("delete_box", ["box2"]))
+        self.failUnlessEqual(steps[8], ("unlink_uri",
+                                        ["olduri"]))
+        self.failUnlessEqual(steps[9], ("unlink_uri", ["oldchk"]))
+
+    def testRun(self):
+        wq = self.wq("testRun")
+        wq.add_upload_chk("source_filename", "box1")
+        wq.add_retain_uri_from_box("box1")
+        wq.add_addpath("box1", ["home", "warner", "foo.txt"])
+        wq.add_delete_box("box1")
+        wq.add_unlink_uri("olduri")
+
+        # this tempfile should be deleted after the last step completes
+        (f, tmpfilename) = wq.create_tempfile(".dummy")
+        tmpfilename = os.path.join(wq.filesdir, tmpfilename)
+        f.write("stuff")
+        f.close()
+        self.failUnless(os.path.exists(tmpfilename))
+
+        d = wq.run_all_steps()
+        def _check(res):
+            self.failUnlessEqual(len(wq.dispatched_steps), 5)
+            self.failUnlessEqual(wq.dispatched_steps[0][0], "upload_chk")
+            self.failIf(os.path.exists(tmpfilename))
+        d.addCallback(_check)
+        return d
diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py
index d943a8d7..e1c06b1c 100644
--- a/src/allmydata/workqueue.py
+++ b/src/allmydata/workqueue.py
@@ -258,8 +258,22 @@ class WorkQueue(object):
             d = self.dispatch_step(steptype, lines)
             d.addCallback(self._delete_step, stepname)
             return d
+        # no steps pending, it is safe to clean out leftover files
+        self._clean_leftover_files()
         return None
 
+    def _clean_leftover_files(self):
+        # there are no steps pending, therefore any leftover files in our
+        # filesdir are orphaned and can be deleted. This catches things like
+        # a tempfile being created but the application gets interrupted
+        # before the upload step which references it gets created, or if an
+        # upload step gets written but the remaining sequence (addpath,
+        # delete_box) does not.
+        for n in os.listdir(self.filesdir):
+            os.unlink(os.path.join(self.filesdir, n))
+        for n in os.listdir(self.boxesdir):
+            os.unlink(os.path.join(self.boxesdir, n))
+
     def get_next_step(self):
         stepnames = [n for n in os.listdir(self.basedir)
                      if n.startswith("step-")]
@@ -296,11 +310,21 @@ class WorkQueue(object):
                     if n.startswith("step-")])
     def get_all_steps(self):
         # returns a list of (steptype, lines) for all steps
-        steps = []
+        stepnames = []
         for stepname in os.listdir(self.basedir):
             if stepname.startswith("step-"):
-                steps.append(self._get_step(stepname)[1:])
+                stepnames.append(stepname)
+        stepnames.sort()
+        steps = []
+        for stepname in stepnames:
+            steps.append(self._get_step(stepname)[1:])
         return steps
+    def run_all_steps(self, ignored=None):
+        d = self.run_next_step()
+        if d:
+            d.addCallback(self.run_all_steps)
+            return d
+        return defer.succeed(None)
 
 
     def open_tempfile(self, filename):
-- 
2.45.2