import os
from twisted.trial import unittest
+from twisted.internet import defer
from allmydata import workqueue
from allmydata.util import idlib
self.dispatched_steps = []
def dispatch_step(self, steptype, lines):
- self.dispatched_steps.append(steptype, lines)
+ self.dispatched_steps.append((steptype, lines))
+ return defer.succeed(None)
class Items(unittest.TestCase):
def wq(self, testname):
return FakeWorkQueue("test_workqueue/Items/%s/workqueue" % testname)
+
def testTempfile(self):
wq = self.wq("testTempfile")
(f, filename) = wq.create_tempfile(".chkdir")
self.failUnlessEqual(steps[4], ("unlink_uri",
["olduri"]))
+ def testCHK2(self):
+ wq = self.wq("testCHK2")
+ wq.add_upload_chk("source_filename", "box1")
+ wq.add_retain_uri_from_box("box1")
+ wq.add_addpath("box1", ["home", "warner", "foo.txt"])
+ wq.add_delete_box("box1")
+ wq.add_unlink_uri("olduri")
+
+ # then this batch happens a bit later
+ (f, tmpfilename) = wq.create_tempfile(".chkdir")
+ f.write("some data")
+ f.close()
+ wq.add_upload_chk(os.path.join(wq.filesdir, tmpfilename), "box2")
+ wq.add_delete_tempfile(tmpfilename)
+ wq.add_retain_uri_from_box("box2")
+ wq.add_delete_box("box2")
+ wq.add_unlink_uri("oldchk")
+
+ self.failUnlessEqual(wq.count_pending_steps(), 10)
+ steps = wq.get_all_steps()
+
+ self.failUnlessEqual(steps[0], ("upload_chk",
+ ["source_filename", "box1"]))
+ self.failUnlessEqual(steps[1], ("retain_uri_from_box",
+ ["box1"]))
+ self.failUnlessEqual(steps[2], ("addpath",
+ ["box1", "home", "warner", "foo.txt"]))
+ self.failUnlessEqual(steps[3], ("delete_box",
+ ["box1"]))
+ self.failUnlessEqual(steps[4],
+ ("upload_chk",
+ [os.path.join(wq.filesdir, tmpfilename),
+ "box2"]))
+ self.failUnlessEqual(steps[5],
+ ("delete_tempfile", [tmpfilename]))
+ self.failUnlessEqual(steps[6],
+ ("retain_uri_from_box", ["box2"]))
+ self.failUnlessEqual(steps[7], ("delete_box", ["box2"]))
+ self.failUnlessEqual(steps[8], ("unlink_uri",
+ ["olduri"]))
+ self.failUnlessEqual(steps[9], ("unlink_uri", ["oldchk"]))
+
+ def testRun(self):
+ wq = self.wq("testRun")
+ wq.add_upload_chk("source_filename", "box1")
+ wq.add_retain_uri_from_box("box1")
+ wq.add_addpath("box1", ["home", "warner", "foo.txt"])
+ wq.add_delete_box("box1")
+ wq.add_unlink_uri("olduri")
+
+ # this tempfile should be deleted after the last step completes
+ (f, tmpfilename) = wq.create_tempfile(".dummy")
+ tmpfilename = os.path.join(wq.filesdir, tmpfilename)
+ f.write("stuff")
+ f.close()
+ self.failUnless(os.path.exists(tmpfilename))
+
+ d = wq.run_all_steps()
+ def _check(res):
+ self.failUnlessEqual(len(wq.dispatched_steps), 5)
+ self.failUnlessEqual(wq.dispatched_steps[0][0], "upload_chk")
+ self.failIf(os.path.exists(tmpfilename))
+ d.addCallback(_check)
+ return d
d = self.dispatch_step(steptype, lines)
d.addCallback(self._delete_step, stepname)
return d
+ # no steps pending, it is safe to clean out leftover files
+ self._clean_leftover_files()
return None
+ def _clean_leftover_files(self):
+ # there are no steps pending, therefore any leftover files in our
+ # filesdir are orphaned and can be deleted. This catches things like
+ # a tempfile being created but the application gets interrupted
+ # before the upload step which references it gets created, or if an
+ # upload step gets written but the remaining sequence (addpath,
+ # delete_box) does not.
+ for n in os.listdir(self.filesdir):
+ os.unlink(os.path.join(self.filesdir, n))
+ for n in os.listdir(self.boxesdir):
+ os.unlink(os.path.join(self.boxesdir, n))
+
def get_next_step(self):
stepnames = [n for n in os.listdir(self.basedir)
if n.startswith("step-")]
if n.startswith("step-")])
def get_all_steps(self):
# returns a list of (steptype, lines) for all steps
- steps = []
+ stepnames = []
for stepname in os.listdir(self.basedir):
if stepname.startswith("step-"):
- steps.append(self._get_step(stepname)[1:])
+ stepnames.append(stepname)
+ stepnames.sort()
+ steps = []
+ for stepname in stepnames:
+ steps.append(self._get_step(stepname)[1:])
return steps
+ def run_all_steps(self, ignored=None):
+ d = self.run_next_step()
+ if d:
+ d.addCallback(self.run_all_steps)
+ return d
+ return defer.succeed(None)
def open_tempfile(self, filename):