From: Brian Warner Date: Tue, 2 Jan 2007 06:47:16 +0000 (-0700) Subject: checkpoint work-in-progress for WorkQueue, a disk-persistent list of work to be done X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~413 X-Git-Url: https://git.rkrishnan.org/pf/content/FOOURL?a=commitdiff_plain;h=e600571f82e49fe592e312b7776c873e68ba0ae5;p=tahoe-lafs%2Ftahoe-lafs.git checkpoint work-in-progress for WorkQueue, a disk-persistent list of work to be done --- diff --git a/src/allmydata/filetable_new.py b/src/allmydata/filetable_new.py index fdd10a36..62f7b51b 100644 --- a/src/allmydata/filetable_new.py +++ b/src/allmydata/filetable_new.py @@ -413,7 +413,7 @@ class MutableCHKDirectorySubTree(_MutableDirectorySubTree): def upload_my_serialized_form(self, work_queue): # this is the CHK form - f, filename = work_queue.create_tempfile() + f, filename = work_queue.create_tempfile(".chkdir") self.serialize_to_file(f) f.close() boxname = work_queue.create_boxname() @@ -441,7 +441,7 @@ class MutableSSKDirectorySubTree(_MutableDirectorySubTree): def upload_my_serialized_form(self, work_queue): # this is the SSK form - f, filename = work_queue.create_tempfile() + f, filename = work_queue.create_tempfile(".sskdir") self.serialize_to_file(f) f.close() work_queue.add_upload_ssk(filename, self.get_write_capability(), diff --git a/src/allmydata/test/test_filetable_new.py b/src/allmydata/test/test_filetable_new.py index 5c64afa2..473050ca 100644 --- a/src/allmydata/test/test_filetable_new.py +++ b/src/allmydata/test/test_filetable_new.py @@ -28,7 +28,7 @@ class FakeWorkQueue(object): self.first_commands = [] self.last_commands = [] - def create_tempfile(self): + def create_tempfile(self, suffix=""): self.tempfile_number += 1 self.first_commands.append("create_tempfile-%d" % self.tempfile_number) return (StringIO(), "dummy_filename-%d" % self.tempfile_number) diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py index 81d9d490..2f0de7db 100644 --- a/src/allmydata/workqueue.py +++ b/src/allmydata/workqueue.py @@ -2,6 +2,7 @@ import os, shutil from zope.interface import Interface, implements from allmydata.util import bencode +from allmydata.util.idlib import b2a class IWorkQueue(Interface): """Each filetable root is associated a work queue, which is persisted on @@ -21,7 +22,7 @@ class IWorkQueue(Interface): the app to be offline. """ - def create_tempfile(): + def create_tempfile(suffix=""): """Return (f, filename).""" def create_boxname(): """Return a unique box name (as a string).""" @@ -115,26 +116,193 @@ class UploadSSKStep(Step): class WorkQueue(object): implements(IWorkQueue) def __init__(self, basedir): + assert basedir.endswith("workqueue") self.basedir = basedir + self.seqnum = 0 + self.tmpdir = os.path.join(basedir, "tmp") + #self.trashdir = os.path.join(basedir, "trash") + self.filesdir = os.path.join(basedir, "files") + self.boxesdir = os.path.join(basedir, "boxes") + if os.path.exists(self.tmpdir): + shutil.rmtree(self.tmpdir) + os.makedirs(self.tmpdir) + #if os.path.exists(self.trashdir): + # shutil.rmtree(self.trashdir) + #os.makedirs(self.trashdir) + if not os.path.exists(self.filesdir): + # filesdir is *not* cleared + os.makedirs(self.filesdir) + if not os.path.exists(self.boxesdir): + # likewise, boxesdir is not cleared + os.makedirs(self.boxesdir) + # all Steps are recorded in separate files in our basedir. All such + # files are named with the pattern 'step-END-NNN', where END is + # either 'first' or 'last'. These steps are to be executed in + # alphabetical order, with all 'step-first-NNN' steps running before + # any 'step-last-NNN'. + for n in os.list(self.basedir): + if n.startswith("step-first-"): + sn = int(n[len("step-first-"):]) + self.seqnum = max(self.seqnum, sn) + elif n.startswith("step-last-"): + sn = int(n[len("step-last-"):]) + self.seqnum = max(self.seqnum, sn) + # each of these files contains one string per line, and the first + # line specifies what kind of step it is + assert seqnum < 1000 # TODO: don't let this grow unboundedly + + def create_tempfile(self, suffix=""): + randomname = b2a(os.random(10)) + filename = randomname + suffix + f = open(os.path.join(self.filesdir, filename), "wb") + return (f, filename) + + def create_boxname(self): + return b2a(os.random(10)) + def write_to_box(self, boxname, data): + f = open(os.path.join(self.boxesdir, boxname), "w") + f.write(data) + f.flush() + os.fsync(f) + f.close() + def read_from_box(self, boxname): + f = open(os.path.join(self.boxesdir, boxname), "r") + data = f.read() + f.close() + return data + + def _create_step(self, end, lines): + assert end in ("first", "last") + filename = "step-%s-%d" % (end, self.seqnum) + self.seqnum += 1 + f = open(os.path.join(self.tmpdir, filename), "w") + for line in lines: + assert "\n" not in line, line + f.write(line) + f.write("\n") + f.flush() + os.fsync(f) + f.close() + fromfile = os.path.join(self.tmpdir, filename) + tofile = os.path.join(self.basedir, filename) + os.rename(fromfile, tofile) + # methods to add entries to the queue + def add_upload_chk(self, source_filename, stash_uri_in_boxname): + # source_filename is absolute, and can point to things outside our + # workqueue. + lines = ["upload_chk", source_filename, stash_uri_in_boxname] + self._create_step(lines) + + def add_upload_ssk(self, source_filename, write_capability, + previous_version): + lines = ["upload_ssk", source_filename, + b2a(write_capability.index), b2a(write_capability.key), + str(previous_version)] + self._create_step(lines) + + def add_retain_ssk(self, read_capability): + lines = ["retain_ssk", b2a(read_capability.index), + b2a(read_capability.key)] + self._create_step(lines) + + def add_unlink_ssk(self, write_capability): + lines = ["unlink_ssk", b2a(write_capability.index), + b2a(write_capability.key)] + self._create_step(lines) + + def add_retain_uri_from_box(self, boxname): + lines = ["retain_uri_from_box", boxname] + self._create_step(lines) + + def add_addpath(self, boxname, path): + assert isinstance(path, (list, tuple)) + lines = ["addpath", boxname] + lines.extend(path) + self._create_step(lines) + + def add_unlink_uri(self, uri): + lines = ["unlink_uri", uri] + self._create_step(lines) + + def delete_tempfile(self, filename): + lines = ["delete_tempfile", filename] + self._create_step(lines) + + def delete_box(self, boxname): + lines = ["delete_box", boxname] + self._create_step(lines) + # methods to perform work + def run_next_step(self): + """Run the next pending step. + + Returns None if there is no next step to run, or a Deferred that + will fire when the step completes.""" + next_step = self.get_next_step() + if next_step: + steptype, lines = self.get_next_step() + return self.dispatch_step(steptype, lines) + return None + def get_next_step(self): - stepname = self._find_first_step() - stepbase = os.path.join(self.basedir, stepname) - f = open(os.path.join(stepbase, "type"), "r") - stype = f.read().strip() + stepnames = [n for n in os.list(self.basedir) + if n.startswith("step-")] + stepnames.sort() + if not stepnames: + return None + stepname = stepnames[0] + f = open(os.path.join(self.basedir, stepname), "r") + lines = f.read().split("\n") f.close() - if stype == "upload_ssk": - s = UploadSSKStep() - # ... - else: - raise RuntimeError("unknown step type '%s'" % stype) - s.setup(stepname, self.basedir) - d = s.start() + assert lines[-1] == "" + lines.pop(-1) + steptype = lines[0] + return steptype, lines + + def dispatch_step(self, steptype, lines): + handlername = "step_" + steptype + if not hasattr(self, handlername): + raise RuntimeError("unknown workqueue step type '%s'" % steptype) + handler = getattr(self, handlername) + d = defer.maybeDeferred(handler, *lines[1:]) + d.addCallback(self._done, stepname) return d + def _done(self, res, stepname): + os.unlink(os.path.join(self.basedir, stepname)) + return res + + def count_pending_steps(self): + return len([n for n in os.list(self.basedir) + if n.startswith("step-")]) + + def step_upload_chk(self, source_filename, index_a, write_key_a): + pass + def step_upload_ssk(self, source_filename, index_a, write_key_a, prev_ver): + pass + + def step_addpath(self, boxname, *path): + data = self.read_from_box(boxname) + child_spec = something.unserialize(data) + return self.root.add_subpath(path, child_spec, self) + + def step_retain_ssk(self, index_a, read_key_a): + pass + def step_unlink_ssk(self, index_a, write_key_a): + pass + def step_retain_uri_from_box(self, boxname): + pass + def step_unlink_uri(self, uri): + pass + + def step_delete_tempfile(self, filename): + os.unlink(os.path.join(self.filesdir, filename)) + def step_delete_box(self, boxname): + os.unlink(os.path.join(self.boxesdir, boxname)) +