--- /dev/null
+
+import os
+from twisted.trial import unittest
+from allmydata import workqueue
+from allmydata.util import idlib
+
+class FakeWorkQueue(workqueue.WorkQueue):
+
+ def __init__(self, basedir):
+ workqueue.WorkQueue.__init__(self, basedir)
+ self.dispatched_steps = []
+
+ def dispatch_step(self, steptype, lines):
+ self.dispatched_steps.append(steptype, lines)
+
+class Items(unittest.TestCase):
+ def wq(self, testname):
+ return FakeWorkQueue("test_workqueue/Items/%s/workqueue" % testname)
+ def testTempfile(self):
+ wq = self.wq("testTempfile")
+ (f, filename) = wq.create_tempfile(".chkdir")
+ self.failUnless(filename.endswith(".chkdir"))
+ data = "this is some random data: %s\n" % idlib.b2a(os.urandom(15))
+ f.write(data)
+ f.close()
+ f2 = wq.open_tempfile(filename)
+ data2 = f2.read()
+ f2.close()
+ self.failUnlessEqual(data, data2)
+
+ def testCHK(self):
+ wq = self.wq("testCHK")
+ wq.add_upload_chk("source_filename", "box1")
+ wq.add_retain_uri_from_box("box1")
+ wq.add_addpath("box1", ["home", "warner", "foo.txt"])
+ wq.add_delete_box("box1")
+ wq.add_unlink_uri("olduri")
+
+ self.failUnlessEqual(wq.count_pending_steps(), 5)
+ stepname, steptype, lines = wq.get_next_step()
+ self.failUnlessEqual(steptype, "upload_chk")
+ steps = wq.get_all_steps()
+ self.failUnlessEqual(steps[0], ("upload_chk",
+ ["source_filename", "box1"]))
+ self.failUnlessEqual(steps[1], ("retain_uri_from_box",
+ ["box1"]))
+ self.failUnlessEqual(steps[2], ("addpath",
+ ["box1", "home", "warner", "foo.txt"]))
+ self.failUnlessEqual(steps[3], ("delete_box",
+ ["box1"]))
+ self.failUnlessEqual(steps[4], ("unlink_uri",
+ ["olduri"]))
+
# either 'first' or 'last'. These steps are to be executed in
# alphabetical order, with all 'step-first-NNN' steps running before
# any 'step-last-NNN'.
- for n in os.list(self.basedir):
+ for n in os.listdir(self.basedir):
if n.startswith("step-first-"):
sn = int(n[len("step-first-"):])
self.seqnum = max(self.seqnum, sn)
assert self.seqnum < 1000 # TODO: don't let this grow unboundedly
def create_tempfile(self, suffix=""):
- randomname = b2a(os.random(10))
+ randomname = b2a(os.urandom(10))
filename = randomname + suffix
f = open(os.path.join(self.filesdir, filename), "wb")
return (f, filename)
def create_boxname(self):
- return b2a(os.random(10))
+ return b2a(os.urandom(10))
def write_to_box(self, boxname, data):
f = open(os.path.join(self.boxesdir, boxname), "w")
f.write(data)
tofile = os.path.join(self.basedir, filename)
os.rename(fromfile, tofile)
+ def _create_step_first(self, lines):
+ self._create_step("first", lines)
+ def _create_step_last(self, lines):
+ self._create_step("last", lines)
+
# methods to add entries to the queue
def add_upload_chk(self, source_filename, stash_uri_in_boxname):
# source_filename is absolute, and can point to things outside our
# workqueue.
lines = ["upload_chk", source_filename, stash_uri_in_boxname]
- self._create_step(lines)
+ self._create_step_first(lines)
def add_upload_ssk(self, source_filename, write_capability,
previous_version):
lines = ["upload_ssk", source_filename,
b2a(write_capability.index), b2a(write_capability.key),
str(previous_version)]
- self._create_step(lines)
+ self._create_step_first(lines)
def add_retain_ssk(self, read_capability):
lines = ["retain_ssk", b2a(read_capability.index),
b2a(read_capability.key)]
- self._create_step(lines)
+ self._create_step_first(lines)
def add_unlink_ssk(self, write_capability):
lines = ["unlink_ssk", b2a(write_capability.index),
b2a(write_capability.key)]
- self._create_step(lines)
+ self._create_step_last(lines)
def add_retain_uri_from_box(self, boxname):
lines = ["retain_uri_from_box", boxname]
- self._create_step(lines)
+ self._create_step_first(lines)
def add_addpath(self, boxname, path):
assert isinstance(path, (list, tuple))
lines = ["addpath", boxname]
lines.extend(path)
- self._create_step(lines)
+ self._create_step_first(lines)
def add_unlink_uri(self, uri):
lines = ["unlink_uri", uri]
- self._create_step(lines)
+ self._create_step_last(lines)
- def delete_tempfile(self, filename):
+ def add_delete_tempfile(self, filename):
lines = ["delete_tempfile", filename]
- self._create_step(lines)
+ self._create_step_first(lines)
- def delete_box(self, boxname):
+ def add_delete_box(self, boxname):
lines = ["delete_box", boxname]
- self._create_step(lines)
+ self._create_step_first(lines)
# methods to perform work
"""Run the next pending step.
Returns None if there is no next step to run, or a Deferred that
- will fire when the step completes."""
+ will fire when the step completes. The step will be removed
+ from the queue when it completes."""
next_step = self.get_next_step()
if next_step:
stepname, steptype, lines = self.get_next_step()
- return self.dispatch_step(stepname, steptype, lines)
+ d = self.dispatch_step(steptype, lines)
+ d.addCallback(self._delete_step, stepname)
+ return d
return None
def get_next_step(self):
- stepnames = [n for n in os.list(self.basedir)
+ stepnames = [n for n in os.listdir(self.basedir)
if n.startswith("step-")]
stepnames.sort()
if not stepnames:
return None
stepname = stepnames[0]
+ return self._get_step(stepname)
+
+ def _get_step(self, stepname):
f = open(os.path.join(self.basedir, stepname), "r")
lines = f.read().split("\n")
f.close()
- assert lines[-1] == ""
- lines.pop(-1)
- steptype = lines[0]
+ assert lines[-1] == "" # files should end with a newline
+ lines.pop(-1) # remove the newline
+ steptype = lines.pop(0)
return stepname, steptype, lines
- def dispatch_step(self, stepname, steptype, lines):
+ def dispatch_step(self, steptype, lines):
handlername = "step_" + steptype
if not hasattr(self, handlername):
raise RuntimeError("unknown workqueue step type '%s'" % steptype)
handler = getattr(self, handlername)
d = defer.maybeDeferred(handler, *lines[1:])
- d.addCallback(self._done, stepname)
return d
- def _done(self, res, stepname):
+ def _delete_step(self, res, stepname):
os.unlink(os.path.join(self.basedir, stepname))
return res
+ # debug/test methods
def count_pending_steps(self):
- return len([n for n in os.list(self.basedir)
+ return len([n for n in os.listdir(self.basedir)
if n.startswith("step-")])
+ def get_all_steps(self):
+ # returns a list of (steptype, lines) for all steps
+ steps = []
+ for stepname in os.listdir(self.basedir):
+ if stepname.startswith("step-"):
+ steps.append(self._get_step(stepname)[1:])
+ return steps
+
+
+ def open_tempfile(self, filename):
+ f = open(os.path.join(self.filesdir, filename), "rb")
+ return f
+
+ # work is dispatched to these methods. To add a new step type, add a
+ # dispatch method here and an add_ method above.
+
def step_upload_chk(self, source_filename, index_a, write_key_a):
pass