From: Brian Warner Date: Tue, 9 Jan 2007 04:29:33 +0000 (-0700) Subject: workqueue: start adding tests X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~383 X-Git-Url: https://git.rkrishnan.org/pf/content/%22file:/(%5B%5E?a=commitdiff_plain;h=b9edb028200ffa2db3e106dd5915cc94db2314ce;p=tahoe-lafs%2Ftahoe-lafs.git workqueue: start adding tests --- diff --git a/src/allmydata/test/test_workqueue.py b/src/allmydata/test/test_workqueue.py new file mode 100644 index 00000000..6c19a5e8 --- /dev/null +++ b/src/allmydata/test/test_workqueue.py @@ -0,0 +1,53 @@ + +import os +from twisted.trial import unittest +from allmydata import workqueue +from allmydata.util import idlib + +class FakeWorkQueue(workqueue.WorkQueue): + + def __init__(self, basedir): + workqueue.WorkQueue.__init__(self, basedir) + self.dispatched_steps = [] + + def dispatch_step(self, steptype, lines): + self.dispatched_steps.append(steptype, lines) + +class Items(unittest.TestCase): + def wq(self, testname): + return FakeWorkQueue("test_workqueue/Items/%s/workqueue" % testname) + def testTempfile(self): + wq = self.wq("testTempfile") + (f, filename) = wq.create_tempfile(".chkdir") + self.failUnless(filename.endswith(".chkdir")) + data = "this is some random data: %s\n" % idlib.b2a(os.urandom(15)) + f.write(data) + f.close() + f2 = wq.open_tempfile(filename) + data2 = f2.read() + f2.close() + self.failUnlessEqual(data, data2) + + def testCHK(self): + wq = self.wq("testCHK") + wq.add_upload_chk("source_filename", "box1") + wq.add_retain_uri_from_box("box1") + wq.add_addpath("box1", ["home", "warner", "foo.txt"]) + wq.add_delete_box("box1") + wq.add_unlink_uri("olduri") + + self.failUnlessEqual(wq.count_pending_steps(), 5) + stepname, steptype, lines = wq.get_next_step() + self.failUnlessEqual(steptype, "upload_chk") + steps = wq.get_all_steps() + self.failUnlessEqual(steps[0], ("upload_chk", + ["source_filename", "box1"])) + self.failUnlessEqual(steps[1], ("retain_uri_from_box", + ["box1"])) + self.failUnlessEqual(steps[2], ("addpath", + ["box1", "home", "warner", "foo.txt"])) + self.failUnlessEqual(steps[3], ("delete_box", + ["box1"])) + self.failUnlessEqual(steps[4], ("unlink_uri", + ["olduri"])) + diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py index 13faf709..d943a8d7 100644 --- a/src/allmydata/workqueue.py +++ b/src/allmydata/workqueue.py @@ -145,7 +145,7 @@ class WorkQueue(object): # either 'first' or 'last'. These steps are to be executed in # alphabetical order, with all 'step-first-NNN' steps running before # any 'step-last-NNN'. - for n in os.list(self.basedir): + for n in os.listdir(self.basedir): if n.startswith("step-first-"): sn = int(n[len("step-first-"):]) self.seqnum = max(self.seqnum, sn) @@ -157,13 +157,13 @@ class WorkQueue(object): assert self.seqnum < 1000 # TODO: don't let this grow unboundedly def create_tempfile(self, suffix=""): - randomname = b2a(os.random(10)) + randomname = b2a(os.urandom(10)) filename = randomname + suffix f = open(os.path.join(self.filesdir, filename), "wb") return (f, filename) def create_boxname(self): - return b2a(os.random(10)) + return b2a(os.urandom(10)) def write_to_box(self, boxname, data): f = open(os.path.join(self.boxesdir, boxname), "w") f.write(data) @@ -192,51 +192,56 @@ class WorkQueue(object): tofile = os.path.join(self.basedir, filename) os.rename(fromfile, tofile) + def _create_step_first(self, lines): + self._create_step("first", lines) + def _create_step_last(self, lines): + self._create_step("last", lines) + # methods to add entries to the queue def add_upload_chk(self, source_filename, stash_uri_in_boxname): # source_filename is absolute, and can point to things outside our # workqueue. lines = ["upload_chk", source_filename, stash_uri_in_boxname] - self._create_step(lines) + self._create_step_first(lines) def add_upload_ssk(self, source_filename, write_capability, previous_version): lines = ["upload_ssk", source_filename, b2a(write_capability.index), b2a(write_capability.key), str(previous_version)] - self._create_step(lines) + self._create_step_first(lines) def add_retain_ssk(self, read_capability): lines = ["retain_ssk", b2a(read_capability.index), b2a(read_capability.key)] - self._create_step(lines) + self._create_step_first(lines) def add_unlink_ssk(self, write_capability): lines = ["unlink_ssk", b2a(write_capability.index), b2a(write_capability.key)] - self._create_step(lines) + self._create_step_last(lines) def add_retain_uri_from_box(self, boxname): lines = ["retain_uri_from_box", boxname] - self._create_step(lines) + self._create_step_first(lines) def add_addpath(self, boxname, path): assert isinstance(path, (list, tuple)) lines = ["addpath", boxname] lines.extend(path) - self._create_step(lines) + self._create_step_first(lines) def add_unlink_uri(self, uri): lines = ["unlink_uri", uri] - self._create_step(lines) + self._create_step_last(lines) - def delete_tempfile(self, filename): + def add_delete_tempfile(self, filename): lines = ["delete_tempfile", filename] - self._create_step(lines) + self._create_step_first(lines) - def delete_box(self, boxname): + def add_delete_box(self, boxname): lines = ["delete_box", boxname] - self._create_step(lines) + self._create_step_first(lines) # methods to perform work @@ -245,44 +250,66 @@ class WorkQueue(object): """Run the next pending step. Returns None if there is no next step to run, or a Deferred that - will fire when the step completes.""" + will fire when the step completes. The step will be removed + from the queue when it completes.""" next_step = self.get_next_step() if next_step: stepname, steptype, lines = self.get_next_step() - return self.dispatch_step(stepname, steptype, lines) + d = self.dispatch_step(steptype, lines) + d.addCallback(self._delete_step, stepname) + return d return None def get_next_step(self): - stepnames = [n for n in os.list(self.basedir) + stepnames = [n for n in os.listdir(self.basedir) if n.startswith("step-")] stepnames.sort() if not stepnames: return None stepname = stepnames[0] + return self._get_step(stepname) + + def _get_step(self, stepname): f = open(os.path.join(self.basedir, stepname), "r") lines = f.read().split("\n") f.close() - assert lines[-1] == "" - lines.pop(-1) - steptype = lines[0] + assert lines[-1] == "" # files should end with a newline + lines.pop(-1) # remove the newline + steptype = lines.pop(0) return stepname, steptype, lines - def dispatch_step(self, stepname, steptype, lines): + def dispatch_step(self, steptype, lines): handlername = "step_" + steptype if not hasattr(self, handlername): raise RuntimeError("unknown workqueue step type '%s'" % steptype) handler = getattr(self, handlername) d = defer.maybeDeferred(handler, *lines[1:]) - d.addCallback(self._done, stepname) return d - def _done(self, res, stepname): + def _delete_step(self, res, stepname): os.unlink(os.path.join(self.basedir, stepname)) return res + # debug/test methods def count_pending_steps(self): - return len([n for n in os.list(self.basedir) + return len([n for n in os.listdir(self.basedir) if n.startswith("step-")]) + def get_all_steps(self): + # returns a list of (steptype, lines) for all steps + steps = [] + for stepname in os.listdir(self.basedir): + if stepname.startswith("step-"): + steps.append(self._get_step(stepname)[1:]) + return steps + + + def open_tempfile(self, filename): + f = open(os.path.join(self.filesdir, filename), "rb") + return f + + # work is dispatched to these methods. To add a new step type, add a + # dispatch method here and an add_ method above. + def step_upload_chk(self, source_filename, index_a, write_key_a): pass