]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
checkpoint work-in-progress for WorkQueue, a disk-persistent list of work to be done
authorBrian Warner <warner@lothar.com>
Tue, 2 Jan 2007 06:47:16 +0000 (23:47 -0700)
committerBrian Warner <warner@lothar.com>
Tue, 2 Jan 2007 06:47:16 +0000 (23:47 -0700)
src/allmydata/filetable_new.py
src/allmydata/test/test_filetable_new.py
src/allmydata/workqueue.py

index fdd10a365b1aa8b84e439253b9f0780381439eb4..62f7b51b83676d77341df544ffc1c99d9268654a 100644 (file)
@@ -413,7 +413,7 @@ class MutableCHKDirectorySubTree(_MutableDirectorySubTree):
 
     def upload_my_serialized_form(self, work_queue):
         # this is the CHK form
-        f, filename = work_queue.create_tempfile()
+        f, filename = work_queue.create_tempfile(".chkdir")
         self.serialize_to_file(f)
         f.close()
         boxname = work_queue.create_boxname()
@@ -441,7 +441,7 @@ class MutableSSKDirectorySubTree(_MutableDirectorySubTree):
 
     def upload_my_serialized_form(self, work_queue):
         # this is the SSK form
-        f, filename = work_queue.create_tempfile()
+        f, filename = work_queue.create_tempfile(".sskdir")
         self.serialize_to_file(f)
         f.close()
         work_queue.add_upload_ssk(filename, self.get_write_capability(),
index 5c64afa21c53f646a2659e757fd0cbd0ab42d123..473050ca60a2875addab1854d0d3ffcf9a17e729 100644 (file)
@@ -28,7 +28,7 @@ class FakeWorkQueue(object):
         self.first_commands = []
         self.last_commands = []
 
-    def create_tempfile(self):
+    def create_tempfile(self, suffix=""):
         self.tempfile_number += 1
         self.first_commands.append("create_tempfile-%d" % self.tempfile_number)
         return (StringIO(), "dummy_filename-%d" % self.tempfile_number)
index 81d9d490c4c74fafd45ead624ca0dd735b91ab56..2f0de7db9cfca1fe6d42d7fc5b55b397eda4bdd7 100644 (file)
@@ -2,6 +2,7 @@
 import os, shutil
 from zope.interface import Interface, implements
 from allmydata.util import bencode
+from allmydata.util.idlib import b2a
 
 class IWorkQueue(Interface):
     """Each filetable root is associated a work queue, which is persisted on
@@ -21,7 +22,7 @@ class IWorkQueue(Interface):
     the app to be offline.
     """
 
-    def create_tempfile():
+    def create_tempfile(suffix=""):
         """Return (f, filename)."""
     def create_boxname():
         """Return a unique box name (as a string)."""
@@ -115,26 +116,193 @@ class UploadSSKStep(Step):
 class WorkQueue(object):
     implements(IWorkQueue)
     def __init__(self, basedir):
+        assert basedir.endswith("workqueue")
         self.basedir = basedir
+        self.seqnum = 0
+        self.tmpdir = os.path.join(basedir, "tmp")
+        #self.trashdir = os.path.join(basedir, "trash")
+        self.filesdir = os.path.join(basedir, "files")
+        self.boxesdir = os.path.join(basedir, "boxes")
+        if os.path.exists(self.tmpdir):
+            shutil.rmtree(self.tmpdir)
+        os.makedirs(self.tmpdir)
+        #if os.path.exists(self.trashdir):
+        #    shutil.rmtree(self.trashdir)
+        #os.makedirs(self.trashdir)
+        if not os.path.exists(self.filesdir):
+            # filesdir is *not* cleared
+            os.makedirs(self.filesdir)
+        if not os.path.exists(self.boxesdir):
+            # likewise, boxesdir is not cleared
+            os.makedirs(self.boxesdir)
+        # all Steps are recorded in separate files in our basedir. All such
+        # files are named with the pattern 'step-END-NNN', where END is
+        # either 'first' or 'last'. These steps are to be executed in
+        # alphabetical order, with all 'step-first-NNN' steps running before
+        # any 'step-last-NNN'.
+        for n in os.list(self.basedir):
+            if n.startswith("step-first-"):
+                sn = int(n[len("step-first-"):])
+                self.seqnum = max(self.seqnum, sn)
+            elif n.startswith("step-last-"):
+                sn = int(n[len("step-last-"):])
+                self.seqnum = max(self.seqnum, sn)
+        # each of these files contains one string per line, and the first
+        # line specifies what kind of step it is
+        assert seqnum < 1000 # TODO: don't let this grow unboundedly
+
+    def create_tempfile(self, suffix=""):
+        randomname = b2a(os.random(10))
+        filename = randomname + suffix
+        f = open(os.path.join(self.filesdir, filename), "wb")
+        return (f, filename)
+
+    def create_boxname(self):
+        return b2a(os.random(10))
+    def write_to_box(self, boxname, data):
+        f = open(os.path.join(self.boxesdir, boxname), "w")
+        f.write(data)
+        f.flush()
+        os.fsync(f)
+        f.close()
+    def read_from_box(self, boxname):
+        f = open(os.path.join(self.boxesdir, boxname), "r")
+        data = f.read()
+        f.close()
+        return data
+
+    def _create_step(self, end, lines):
+        assert end in ("first", "last")
+        filename = "step-%s-%d" % (end, self.seqnum)
+        self.seqnum += 1
+        f = open(os.path.join(self.tmpdir, filename), "w")
+        for line in lines:
+            assert "\n" not in line, line
+            f.write(line)
+            f.write("\n")
+        f.flush()
+        os.fsync(f)
+        f.close()
+        fromfile = os.path.join(self.tmpdir, filename)
+        tofile = os.path.join(self.basedir, filename)
+        os.rename(fromfile, tofile)
+
     # methods to add entries to the queue
+    def add_upload_chk(self, source_filename, stash_uri_in_boxname):
+        # source_filename is absolute, and can point to things outside our
+        # workqueue.
+        lines = ["upload_chk", source_filename, stash_uri_in_boxname]
+        self._create_step(lines)
+
+    def add_upload_ssk(self, source_filename, write_capability,
+                       previous_version):
+        lines = ["upload_ssk", source_filename,
+                 b2a(write_capability.index), b2a(write_capability.key),
+                 str(previous_version)]
+        self._create_step(lines)
+
+    def add_retain_ssk(self, read_capability):
+        lines = ["retain_ssk", b2a(read_capability.index),
+                 b2a(read_capability.key)]
+        self._create_step(lines)
+
+    def add_unlink_ssk(self, write_capability):
+        lines = ["unlink_ssk", b2a(write_capability.index),
+                 b2a(write_capability.key)]
+        self._create_step(lines)
+
+    def add_retain_uri_from_box(self, boxname):
+        lines = ["retain_uri_from_box", boxname]
+        self._create_step(lines)
+
+    def add_addpath(self, boxname, path):
+        assert isinstance(path, (list, tuple))
+        lines = ["addpath", boxname]
+        lines.extend(path)
+        self._create_step(lines)
+
+    def add_unlink_uri(self, uri):
+        lines = ["unlink_uri", uri]
+        self._create_step(lines)
+
+    def delete_tempfile(self, filename):
+        lines = ["delete_tempfile", filename]
+        self._create_step(lines)
+
+    def delete_box(self, boxname):
+        lines = ["delete_box", boxname]
+        self._create_step(lines)
+
 
     # methods to perform work
 
+    def run_next_step(self):
+        """Run the next pending step.
+
+        Returns None if there is no next step to run, or a Deferred that
+        will fire when the step completes."""
+        next_step = self.get_next_step()
+        if next_step:
+            steptype, lines = self.get_next_step()
+            return self.dispatch_step(steptype, lines)
+        return None
+
     def get_next_step(self):
-        stepname = self._find_first_step()
-        stepbase = os.path.join(self.basedir, stepname)
-        f = open(os.path.join(stepbase, "type"), "r")
-        stype = f.read().strip()
+        stepnames = [n for n in os.list(self.basedir)
+                     if n.startswith("step-")]
+        stepnames.sort()
+        if not stepnames:
+            return None
+        stepname = stepnames[0]
+        f = open(os.path.join(self.basedir, stepname), "r")
+        lines = f.read().split("\n")
         f.close()
-        if stype == "upload_ssk":
-            s = UploadSSKStep()
-        # ...
-        else:
-            raise RuntimeError("unknown step type '%s'" % stype)
-        s.setup(stepname, self.basedir)
-        d = s.start()
+        assert lines[-1] == ""
+        lines.pop(-1)
+        steptype = lines[0]
+        return steptype, lines
+
+    def dispatch_step(self, steptype, lines):
+        handlername = "step_" + steptype
+        if not hasattr(self, handlername):
+            raise RuntimeError("unknown workqueue step type '%s'" % steptype)
+        handler = getattr(self, handlername)
+        d = defer.maybeDeferred(handler, *lines[1:])
+        d.addCallback(self._done, stepname)
         return d
 
+    def _done(self, res, stepname):
+        os.unlink(os.path.join(self.basedir, stepname))
+        return res
+
+    def count_pending_steps(self):
+        return len([n for n in os.list(self.basedir)
+                    if n.startswith("step-")])
+
+    def step_upload_chk(self, source_filename, index_a, write_key_a):
+        pass
+    def step_upload_ssk(self, source_filename, index_a, write_key_a, prev_ver):
+        pass
+
+    def step_addpath(self, boxname, *path):
+        data = self.read_from_box(boxname)
+        child_spec = something.unserialize(data)
+        return self.root.add_subpath(path, child_spec, self)
+
+    def step_retain_ssk(self, index_a, read_key_a):
+        pass
+    def step_unlink_ssk(self, index_a, write_key_a):
+        pass
+    def step_retain_uri_from_box(self, boxname):
+        pass
+    def step_unlink_uri(self, uri):
+        pass
+
+    def step_delete_tempfile(self, filename):
+        os.unlink(os.path.join(self.filesdir, filename))
+    def step_delete_box(self, boxname):
+        os.unlink(os.path.join(self.boxesdir, boxname))
+