From: Brian Warner Date: Tue, 9 Jan 2007 05:29:42 +0000 (-0700) Subject: workqueue: more improvements, more tests X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~382 X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=b641f6cbc7b8f2ab46a62156d393c387af67cda4;p=tahoe-lafs%2Ftahoe-lafs.git workqueue: more improvements, more tests --- diff --git a/src/allmydata/test/test_workqueue.py b/src/allmydata/test/test_workqueue.py index 6c19a5e8..6ca36b21 100644 --- a/src/allmydata/test/test_workqueue.py +++ b/src/allmydata/test/test_workqueue.py @@ -1,6 +1,7 @@ import os from twisted.trial import unittest +from twisted.internet import defer from allmydata import workqueue from allmydata.util import idlib @@ -11,11 +12,13 @@ class FakeWorkQueue(workqueue.WorkQueue): self.dispatched_steps = [] def dispatch_step(self, steptype, lines): - self.dispatched_steps.append(steptype, lines) + self.dispatched_steps.append((steptype, lines)) + return defer.succeed(None) class Items(unittest.TestCase): def wq(self, testname): return FakeWorkQueue("test_workqueue/Items/%s/workqueue" % testname) + def testTempfile(self): wq = self.wq("testTempfile") (f, filename) = wq.create_tempfile(".chkdir") @@ -51,3 +54,67 @@ class Items(unittest.TestCase): self.failUnlessEqual(steps[4], ("unlink_uri", ["olduri"])) + def testCHK2(self): + wq = self.wq("testCHK2") + wq.add_upload_chk("source_filename", "box1") + wq.add_retain_uri_from_box("box1") + wq.add_addpath("box1", ["home", "warner", "foo.txt"]) + wq.add_delete_box("box1") + wq.add_unlink_uri("olduri") + + # then this batch happens a bit later + (f, tmpfilename) = wq.create_tempfile(".chkdir") + f.write("some data") + f.close() + wq.add_upload_chk(os.path.join(wq.filesdir, tmpfilename), "box2") + wq.add_delete_tempfile(tmpfilename) + wq.add_retain_uri_from_box("box2") + wq.add_delete_box("box2") + wq.add_unlink_uri("oldchk") + + self.failUnlessEqual(wq.count_pending_steps(), 10) + steps = wq.get_all_steps() + + self.failUnlessEqual(steps[0], ("upload_chk", + ["source_filename", "box1"])) + self.failUnlessEqual(steps[1], ("retain_uri_from_box", + ["box1"])) + self.failUnlessEqual(steps[2], ("addpath", + ["box1", "home", "warner", "foo.txt"])) + self.failUnlessEqual(steps[3], ("delete_box", + ["box1"])) + self.failUnlessEqual(steps[4], + ("upload_chk", + [os.path.join(wq.filesdir, tmpfilename), + "box2"])) + self.failUnlessEqual(steps[5], + ("delete_tempfile", [tmpfilename])) + self.failUnlessEqual(steps[6], + ("retain_uri_from_box", ["box2"])) + self.failUnlessEqual(steps[7], ("delete_box", ["box2"])) + self.failUnlessEqual(steps[8], ("unlink_uri", + ["olduri"])) + self.failUnlessEqual(steps[9], ("unlink_uri", ["oldchk"])) + + def testRun(self): + wq = self.wq("testRun") + wq.add_upload_chk("source_filename", "box1") + wq.add_retain_uri_from_box("box1") + wq.add_addpath("box1", ["home", "warner", "foo.txt"]) + wq.add_delete_box("box1") + wq.add_unlink_uri("olduri") + + # this tempfile should be deleted after the last step completes + (f, tmpfilename) = wq.create_tempfile(".dummy") + tmpfilename = os.path.join(wq.filesdir, tmpfilename) + f.write("stuff") + f.close() + self.failUnless(os.path.exists(tmpfilename)) + + d = wq.run_all_steps() + def _check(res): + self.failUnlessEqual(len(wq.dispatched_steps), 5) + self.failUnlessEqual(wq.dispatched_steps[0][0], "upload_chk") + self.failIf(os.path.exists(tmpfilename)) + d.addCallback(_check) + return d diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py index d943a8d7..e1c06b1c 100644 --- a/src/allmydata/workqueue.py +++ b/src/allmydata/workqueue.py @@ -258,8 +258,22 @@ class WorkQueue(object): d = self.dispatch_step(steptype, lines) d.addCallback(self._delete_step, stepname) return d + # no steps pending, it is safe to clean out leftover files + self._clean_leftover_files() return None + def _clean_leftover_files(self): + # there are no steps pending, therefore any leftover files in our + # filesdir are orphaned and can be deleted. This catches things like + # a tempfile being created but the application gets interrupted + # before the upload step which references it gets created, or if an + # upload step gets written but the remaining sequence (addpath, + # delete_box) does not. + for n in os.listdir(self.filesdir): + os.unlink(os.path.join(self.filesdir, n)) + for n in os.listdir(self.boxesdir): + os.unlink(os.path.join(self.boxesdir, n)) + def get_next_step(self): stepnames = [n for n in os.listdir(self.basedir) if n.startswith("step-")] @@ -296,11 +310,21 @@ class WorkQueue(object): if n.startswith("step-")]) def get_all_steps(self): # returns a list of (steptype, lines) for all steps - steps = [] + stepnames = [] for stepname in os.listdir(self.basedir): if stepname.startswith("step-"): - steps.append(self._get_step(stepname)[1:]) + stepnames.append(stepname) + stepnames.sort() + steps = [] + for stepname in stepnames: + steps.append(self._get_step(stepname)[1:]) return steps + def run_all_steps(self, ignored=None): + d = self.run_next_step() + if d: + d.addCallback(self.run_all_steps) + return d + return defer.succeed(None) def open_tempfile(self, filename):