import os, shutil
from zope.interface import Interface, implements
from allmydata.util import bencode
+from allmydata.util.idlib import b2a
class IWorkQueue(Interface):
"""Each filetable root is associated a work queue, which is persisted on
the app to be offline.
"""
- def create_tempfile():
+ def create_tempfile(suffix=""):
"""Return (f, filename)."""
def create_boxname():
"""Return a unique box name (as a string)."""
class WorkQueue(object):
implements(IWorkQueue)
def __init__(self, basedir):
+ assert basedir.endswith("workqueue")
self.basedir = basedir
+ self.seqnum = 0
+ self.tmpdir = os.path.join(basedir, "tmp")
+ #self.trashdir = os.path.join(basedir, "trash")
+ self.filesdir = os.path.join(basedir, "files")
+ self.boxesdir = os.path.join(basedir, "boxes")
+ if os.path.exists(self.tmpdir):
+ shutil.rmtree(self.tmpdir)
+ os.makedirs(self.tmpdir)
+ #if os.path.exists(self.trashdir):
+ # shutil.rmtree(self.trashdir)
+ #os.makedirs(self.trashdir)
+ if not os.path.exists(self.filesdir):
+ # filesdir is *not* cleared
+ os.makedirs(self.filesdir)
+ if not os.path.exists(self.boxesdir):
+ # likewise, boxesdir is not cleared
+ os.makedirs(self.boxesdir)
+ # all Steps are recorded in separate files in our basedir. All such
+ # files are named with the pattern 'step-END-NNN', where END is
+ # either 'first' or 'last'. These steps are to be executed in
+ # alphabetical order, with all 'step-first-NNN' steps running before
+ # any 'step-last-NNN'.
+ for n in os.list(self.basedir):
+ if n.startswith("step-first-"):
+ sn = int(n[len("step-first-"):])
+ self.seqnum = max(self.seqnum, sn)
+ elif n.startswith("step-last-"):
+ sn = int(n[len("step-last-"):])
+ self.seqnum = max(self.seqnum, sn)
+ # each of these files contains one string per line, and the first
+ # line specifies what kind of step it is
+ assert seqnum < 1000 # TODO: don't let this grow unboundedly
+
+ def create_tempfile(self, suffix=""):
+ randomname = b2a(os.random(10))
+ filename = randomname + suffix
+ f = open(os.path.join(self.filesdir, filename), "wb")
+ return (f, filename)
+
+ def create_boxname(self):
+ return b2a(os.random(10))
+ def write_to_box(self, boxname, data):
+ f = open(os.path.join(self.boxesdir, boxname), "w")
+ f.write(data)
+ f.flush()
+ os.fsync(f)
+ f.close()
+ def read_from_box(self, boxname):
+ f = open(os.path.join(self.boxesdir, boxname), "r")
+ data = f.read()
+ f.close()
+ return data
+
+ def _create_step(self, end, lines):
+ assert end in ("first", "last")
+ filename = "step-%s-%d" % (end, self.seqnum)
+ self.seqnum += 1
+ f = open(os.path.join(self.tmpdir, filename), "w")
+ for line in lines:
+ assert "\n" not in line, line
+ f.write(line)
+ f.write("\n")
+ f.flush()
+ os.fsync(f)
+ f.close()
+ fromfile = os.path.join(self.tmpdir, filename)
+ tofile = os.path.join(self.basedir, filename)
+ os.rename(fromfile, tofile)
+
# methods to add entries to the queue
+ def add_upload_chk(self, source_filename, stash_uri_in_boxname):
+ # source_filename is absolute, and can point to things outside our
+ # workqueue.
+ lines = ["upload_chk", source_filename, stash_uri_in_boxname]
+ self._create_step(lines)
+
+ def add_upload_ssk(self, source_filename, write_capability,
+ previous_version):
+ lines = ["upload_ssk", source_filename,
+ b2a(write_capability.index), b2a(write_capability.key),
+ str(previous_version)]
+ self._create_step(lines)
+
+ def add_retain_ssk(self, read_capability):
+ lines = ["retain_ssk", b2a(read_capability.index),
+ b2a(read_capability.key)]
+ self._create_step(lines)
+
+ def add_unlink_ssk(self, write_capability):
+ lines = ["unlink_ssk", b2a(write_capability.index),
+ b2a(write_capability.key)]
+ self._create_step(lines)
+
+ def add_retain_uri_from_box(self, boxname):
+ lines = ["retain_uri_from_box", boxname]
+ self._create_step(lines)
+
+ def add_addpath(self, boxname, path):
+ assert isinstance(path, (list, tuple))
+ lines = ["addpath", boxname]
+ lines.extend(path)
+ self._create_step(lines)
+
+ def add_unlink_uri(self, uri):
+ lines = ["unlink_uri", uri]
+ self._create_step(lines)
+
+ def delete_tempfile(self, filename):
+ lines = ["delete_tempfile", filename]
+ self._create_step(lines)
+
+ def delete_box(self, boxname):
+ lines = ["delete_box", boxname]
+ self._create_step(lines)
+
# methods to perform work
+ def run_next_step(self):
+ """Run the next pending step.
+
+ Returns None if there is no next step to run, or a Deferred that
+ will fire when the step completes."""
+ next_step = self.get_next_step()
+ if next_step:
+ steptype, lines = self.get_next_step()
+ return self.dispatch_step(steptype, lines)
+ return None
+
def get_next_step(self):
- stepname = self._find_first_step()
- stepbase = os.path.join(self.basedir, stepname)
- f = open(os.path.join(stepbase, "type"), "r")
- stype = f.read().strip()
+ stepnames = [n for n in os.list(self.basedir)
+ if n.startswith("step-")]
+ stepnames.sort()
+ if not stepnames:
+ return None
+ stepname = stepnames[0]
+ f = open(os.path.join(self.basedir, stepname), "r")
+ lines = f.read().split("\n")
f.close()
- if stype == "upload_ssk":
- s = UploadSSKStep()
- # ...
- else:
- raise RuntimeError("unknown step type '%s'" % stype)
- s.setup(stepname, self.basedir)
- d = s.start()
+ assert lines[-1] == ""
+ lines.pop(-1)
+ steptype = lines[0]
+ return steptype, lines
+
+ def dispatch_step(self, steptype, lines):
+ handlername = "step_" + steptype
+ if not hasattr(self, handlername):
+ raise RuntimeError("unknown workqueue step type '%s'" % steptype)
+ handler = getattr(self, handlername)
+ d = defer.maybeDeferred(handler, *lines[1:])
+ d.addCallback(self._done, stepname)
return d
+ def _done(self, res, stepname):
+ os.unlink(os.path.join(self.basedir, stepname))
+ return res
+
+ def count_pending_steps(self):
+ return len([n for n in os.list(self.basedir)
+ if n.startswith("step-")])
+
+ def step_upload_chk(self, source_filename, index_a, write_key_a):
+ pass
+ def step_upload_ssk(self, source_filename, index_a, write_key_a, prev_ver):
+ pass
+
+ def step_addpath(self, boxname, *path):
+ data = self.read_from_box(boxname)
+ child_spec = something.unserialize(data)
+ return self.root.add_subpath(path, child_spec, self)
+
+ def step_retain_ssk(self, index_a, read_key_a):
+ pass
+ def step_unlink_ssk(self, index_a, write_key_a):
+ pass
+ def step_retain_uri_from_box(self, boxname):
+ pass
+ def step_unlink_uri(self, uri):
+ pass
+
+ def step_delete_tempfile(self, filename):
+ os.unlink(os.path.join(self.filesdir, filename))
+ def step_delete_box(self, boxname):
+ os.unlink(os.path.join(self.boxesdir, boxname))
+