more filetree, workqueue-boxes now hold serialized Nodes, move NodeMaker out to a...
authorBrian Warner <warner@lothar.com>
Sun, 21 Jan 2007 11:18:54 +0000 (04:18 -0700)
committerBrian Warner <warner@lothar.com>
Sun, 21 Jan 2007 11:18:54 +0000 (04:18 -0700)
src/allmydata/filetree/file.py
src/allmydata/filetree/interfaces.py
src/allmydata/filetree/nodemaker.py [new file with mode: 0644]
src/allmydata/filetree/vdrive.py
src/allmydata/test/test_filetree_new.py
src/allmydata/test/test_workqueue.py
src/allmydata/workqueue.py

index 6862cee802f13928d447f1213acfacc46a88d9d9..e08f2f9a58bf9edfaf440e67dde85ace901be2cc 100644 (file)
@@ -10,6 +10,7 @@ class CHKFileNode(BaseDataNode):
 
     def new(self, uri):
         self.uri = uri
+        return self
 
     def get_base_data(self):
         return self.uri
index dd7990184a0aba372405d1fc3711597af7bcd483..56079272a728a549f7b22516fbc2906f6a1bf2cf 100644 (file)
@@ -245,11 +245,11 @@ class IVirtualDrive(Interface):
 
         path[:-1] must refer to a writable DIRECTORY node. 'uploadable' must
         implement IUploadable. This returns a Deferred that fires (with
-        'uploadable') when the upload is complete.
+        'uploadable') when the upload is complete. Do not use the workqueue.
         """
 
     def upload_later(path, filename):
-        """Upload a file from disk to the given path.
+        """Upload a file from disk to the given path. Use the workqueue.
         """
 
     def delete(path):
@@ -259,6 +259,10 @@ class IVirtualDrive(Interface):
         complete.
         """
 
+    def add_node(path, node):
+        """Add a node to the given path. Use the workqueue.
+        """
+
     # commands to manipulate subtrees
 
     # ... detach subtree, merge subtree, etc
diff --git a/src/allmydata/filetree/nodemaker.py b/src/allmydata/filetree/nodemaker.py
new file mode 100644 (file)
index 0000000..d38458d
--- /dev/null
@@ -0,0 +1,41 @@
+
+from zope.interface import implements
+from allmydata.filetree import directory, file, redirect
+from allmydata.filetree.interfaces import INodeMaker
+
+# this list is used by NodeMaker to convert node specification strings (found
+# inside the serialized form of subtrees) into Nodes (which live in the
+# in-RAM form of subtrees).
+all_node_types = [
+    directory.LocalFileSubTreeNode,
+    directory.CHKDirectorySubTreeNode,
+    directory.SSKDirectorySubTreeNode,
+    file.CHKFileNode,
+    file.SSKFileNode,
+    redirect.LocalFileRedirectionNode,
+    redirect.QueenRedirectionNode,
+    redirect.HTTPRedirectionNode,
+    redirect.QueenOrLocalFileRedirectionNode,
+]
+
+class NodeMaker(object):
+    implements(INodeMaker)
+
+    def make_node_from_serialized(self, serialized):
+        # this turns a string into an INode, which contains information about
+        # the file or directory (like a URI), but does not contain the actual
+        # contents. An ISubTreeMaker can be used later to retrieve the
+        # contents (which means downloading the file if this is an IFileNode,
+        # or perhaps creating a new subtree from the contents)
+
+        # maybe include parent_is_mutable?
+        assert isinstance(serialized, str)
+        prefix, body = serialized.split(":", 2)
+
+        for node_class in all_node_types:
+            if prefix == node_class.prefix:
+                node = node_class()
+                node.populate_node(body, self)
+                return node
+        raise RuntimeError("unable to handle node type '%s'" % prefix)
+
index 63e7271f70cc2ccb22c15d8dc901afa03d56b843..8a44f3436376e23a06c806d9fa8450ab822fe9f6 100644 (file)
@@ -10,43 +10,7 @@ from allmydata.filetree.interfaces import (
     )
 from allmydata.upload import IUploadable
 
-# this list is used by NodeMaker to convert node specification strings (found
-# inside the serialized form of subtrees) into Nodes (which live in the
-# in-RAM form of subtrees).
-all_node_types = [
-    directory.LocalFileSubTreeNode,
-    directory.CHKDirectorySubTreeNode,
-    directory.SSKDirectorySubTreeNode,
-    file.CHKFileNode,
-    file.SSKFileNode,
-    redirect.LocalFileRedirectionNode,
-    redirect.QueenRedirectionNode,
-    redirect.HTTPRedirectionNode,
-    redirect.QueenOrLocalFileRedirectionNode,
-]
-
-class NodeMaker(object):
-    implements(INodeMaker)
-
-    def make_node_from_serialized(self, serialized):
-        # this turns a string into an INode, which contains information about
-        # the file or directory (like a URI), but does not contain the actual
-        # contents. An ISubTreeMaker can be used later to retrieve the
-        # contents (which means downloading the file if this is an IFileNode,
-        # or perhaps creating a new subtree from the contents)
-
-        # maybe include parent_is_mutable?
-        assert isinstance(serialized, str)
-        prefix, body = serialized.split(":", 2)
-
-        for node_class in all_node_types:
-            if prefix == node_class.prefix:
-                node = node_class()
-                node.populate_node(body, self)
-                return node
-        raise RuntimeError("unable to handle node type '%s'" % prefix)
-
-
+from allmydata.filetree.nodemaker import NodeMaker
 
 all_openable_subtree_types = [
     directory.LocalFileSubTree,
@@ -204,7 +168,7 @@ class VirtualDrive(object):
         def _got_closest((node, remaining_path)):
             prepath_len = len(path) - len(remaining_path)
             prepath = path[:prepath_len]
-            assert path[prepath_len:] == remaining_path
+            assert path[prepath_len:] == remaining_path, "um, path=%s, prepath=%s, prepath_len=%d, remaining_path=%s" % (path, prepath, prepath_len, remaining_path)
             return (prepath, node, remaining_path)
         d.addCallback(_got_closest)
         return d
@@ -226,6 +190,7 @@ class VirtualDrive(object):
         d = self._get_closest_node_and_prepath(parent_path)
         def _got_closest((prepath, node, remaining_path)):
             # now tell it to create any necessary parent directories
+            remaining_path = remaining_path[:]
             while remaining_path:
                 node = node.add_subdir(remaining_path.pop(0))
             # 'node' is now the directory where the child wants to go
@@ -248,16 +213,19 @@ class VirtualDrive(object):
     # these are user-visible
 
     def list(self, path):
+        assert isinstance(path, list)
         d = self._get_directory(path)
         d.addCallback(lambda node: node.list())
         return d
 
     def download(self, path, target):
+        assert isinstance(path, list)
         d = self._get_file_uri(path)
         d.addCallback(lambda uri: self.downloader.download(uri, target))
         return d
 
     def upload_now(self, path, uploadable):
+        assert isinstance(path, list)
         # note: the first few steps of this do not use the workqueue, but I
         # think things should remain consistent anyways. If the node is shut
         # down before the file has finished uploading, then we forget all
@@ -266,17 +234,24 @@ class VirtualDrive(object):
         d = self._child_should_not_exist(path)
         # then we upload the file
         d.addCallback(lambda ignored: self.uploader.upload(uploadable))
-        d.addCallback(lambda uri: self.workqueue.create_boxname(uri))
-        d.addCallback(lambda boxname:
-                      self.workqueue.add_addpath(boxname, path))
+        def _uploaded(uri):
+            assert isinstance(uri, str)
+            new_node = file.CHKFileNode().new(uri)
+            boxname = self.workqueue.create_boxname(new_node)
+            self.workqueue.add_addpath(boxname, path)
+            self.workqueue.add_delete_box(boxname)
+        d.addCallback(_uploaded)
         return d
 
     def upload_later(self, path, filename):
+        assert isinstance(path, list)
         boxname = self.workqueue.create_boxname()
         self.workqueue.add_upload_chk(filename, boxname)
         self.workqueue.add_addpath(boxname, path)
+        self.workqueue.add_delete_box(boxname)
 
     def delete(self, path):
+        assert isinstance(path, list)
         parent_path = path[:-1]
         orphan_path = path[-1]
         d = self._get_closest_node_and_prepath(parent_path)
@@ -288,7 +263,16 @@ class VirtualDrive(object):
             boxname = subtree.update(self.workqueue)
             if boxname:
                 self.workqueue.add_addpath(boxname, prepath)
+                self.workqueue.add_delete_box(boxname)
             return self
         d.addCallback(_got_parent)
         return d
 
+    def add_node(self, path, node):
+        assert isinstance(path, list)
+        assert INode(node)
+        assert not IDirectoryNode.providedBy(node)
+        boxname = self.workqueue.create_boxname(node)
+        self.workqueue.add_addpath(boxname, path)
+        self.workqueue.add_delete_box(boxname)
+
index 6a94e1b58d4d39f688c20b4994c72088e4b9102a..8cc3d2bbb603853e01349ddbf2036f24e5ea02ee 100644 (file)
@@ -338,6 +338,11 @@ class Stuff(unittest.TestCase):
     def failUnlessListsAreEqual(self, list1, list2):
         self.failUnlessEqual(sorted(list1), sorted(list2))
 
+    def failUnlessContentsAreEqual(self, c1, c2):
+        c1a = dict([(k,v.serialize_node()) for k,v in c1.items()])
+        c2a = dict([(k,v.serialize_node()) for k,v in c2.items()])
+        self.failUnlessEqual(c1a, c2a)
+
     def testDirectory(self):
         stm = vdrive.SubTreeMaker(None, None)
 
@@ -431,11 +436,36 @@ class Stuff(unittest.TestCase):
         root = redirect.LocalFileRedirection().new("vdrive-root",
                                                    topdir.create_node_now())
         root.update_now(None)
-        wq = self.makeVirtualDrive("vdrive", root.create_node_now())
+        v = self.makeVirtualDrive("vdrive", root.create_node_now())
 
-        d = wq.list([])
+        d = v.list([])
         def _listed(contents):
             self.failUnlessEqual(contents, {})
         d.addCallback(_listed)
+
+        child1 = CHKFileNode().new("uri1")
+        d.addCallback(lambda res: v.add_node(["a"], child1))
+        d.addCallback(lambda res: v.workqueue.flush())
+        d.addCallback(lambda res: v.list([]))
+        def _listed2(contents):
+            self.failUnlessListsAreEqual(contents.keys(), ["a"])
+            self.failUnlessContentsAreEqual(contents, {"a": child1})
+        d.addCallback(_listed2)
+        child2 = CHKFileNode().new("uri2")
+        child3 = CHKFileNode().new("uri3")
+        d.addCallback(lambda res: v.add_node(["b","c"], child2))
+        d.addCallback(lambda res: v.add_node(["b","d"], child3))
+        d.addCallback(lambda res: v.workqueue.flush())
+        d.addCallback(lambda res: v.list([]))
+        def _listed3(contents):
+            self.failUnlessListsAreEqual(contents.keys(), ["a","b"])
+        d.addCallback(_listed3)
+        d.addCallback(lambda res: v.list(["b"]))
+        def _listed4(contents):
+            self.failUnlessListsAreEqual(contents.keys(), ["c","d"])
+            self.failUnlessContentsAreEqual(contents,
+                                            {"c": child2, "d": child3})
+        d.addCallback(_listed4)
+
         return d
 
index b2b20307f68f2942e6a426414dcbd6350d10f3c5..d0c01c5198641ae20814d4ce9dd4a5f766c72d33 100644 (file)
@@ -153,7 +153,7 @@ class Items(unittest.TestCase):
         boxfile = os.path.join(wq.boxesdir, boxname)
         self.failUnless(os.path.exists(boxfile))
 
-        d = wq.run_all_steps()
+        d = wq.flush()
         def _check(res):
             self.failUnlessEqual(len(wq.dispatched_steps), 5)
             self.failUnlessEqual(wq.dispatched_steps[0][0], "upload_chk")
index c0bed206519ea274fe758e5fa7d13a9f4f9aa25f..1982fc54b81837e19d04d0c53d9ce92c7fac3960 100644 (file)
@@ -5,6 +5,8 @@ from twisted.internet import defer
 from allmydata.util import bencode
 from allmydata.util.idlib import b2a
 from allmydata.Crypto.Cipher import AES
+from allmydata.filetree.nodemaker import NodeMaker
+from allmydata.filetree.interfaces import INode
 
 class IWorkQueue(Interface):
     """Each filetable root is associated a work queue, which is persisted on
@@ -22,6 +24,20 @@ class IWorkQueue(Interface):
     application is started, the step can be re-started without problems. The
     placement of the 'retain' commands depends upon how long we might expect
     the app to be offline.
+
+    tempfiles: the workqueue has a special directory where temporary files
+    are stored. create_tempfile() generates these files, while steps like
+    add_upload_chk() use them. The add_delete_tempfile() will delete the
+    tempfile. All tempfiles are deleted when the workqueue becomes empty,
+    since at that point none of them can still be referenced.
+
+    boxes: there is another special directory where named slots (called
+    'boxes') hold serialized INode specifications (the strings which are
+    returned by INode.serialize_node()). Boxes are created by calling
+    create_boxname(). Boxes are filled either at the time of creation or by
+    steps like add_upload_chk(). Boxes are used by steps like add_addpath()
+    and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as
+    well as when the workqueue becomes empty.
     """
 
     def create_tempfile(suffix=""):
@@ -31,7 +47,11 @@ class IWorkQueue(Interface):
         path, rather it will be interpreted relative to some directory known
         only by the workqueue."""
     def create_boxname(contents=None):
-        """Return a unique box name (as a string)."""
+        """Return a unique box name (as a string). If 'contents' are
+        provided, it must be an instance that provides INode, and the
+        serialized form of the node will be written into the box. Otherwise
+        the boxname can be used by steps like add_upload_chk to hold the
+        generated uri."""
 
     def add_upload_chk(source_filename, stash_uri_in_boxname):
         """This step uploads a file to the mesh and obtains a content-based
@@ -88,6 +108,14 @@ class IWorkQueue(Interface):
     def add_delete_box(boxname):
         """When executed, this step deletes the given box."""
 
+
+    # methods for use in unit tests
+
+    def flush():
+        """Execute all steps in the WorkQueue right away. Return a Deferred
+        that fires (with self) when the queue is empty.
+        """
+
 class NotCapableError(Exception):
     """You have tried to write to a read-only node."""
 
@@ -132,6 +160,7 @@ class WorkQueue(object):
     def __init__(self, basedir):
         assert basedir.endswith("workqueue")
         self.basedir = basedir
+        self._node_maker = NodeMaker()
         self.seqnum = 0
         self.tmpdir = os.path.join(basedir, "tmp")
         #self.trashdir = os.path.join(basedir, "trash")
@@ -174,8 +203,12 @@ class WorkQueue(object):
         f = open(os.path.join(self.filesdir, filename), "wb")
         return (f, filename)
 
-    def create_boxname(self):
-        return b2a(os.urandom(10))
+    def create_boxname(self, contents=None):
+        boxname = b2a(os.urandom(10))
+        if contents is not None:
+            assert INode(contents)
+            self.write_to_box(boxname, contents.serialize_node())
+        return boxname
     def write_to_box(self, boxname, data):
         f = open(os.path.join(self.boxesdir, boxname), "w")
         f.write(data)
@@ -309,7 +342,7 @@ class WorkQueue(object):
         if not hasattr(self, handlername):
             raise RuntimeError("unknown workqueue step type '%s'" % steptype)
         handler = getattr(self, handlername)
-        d = defer.maybeDeferred(handler, *lines[1:])
+        d = defer.maybeDeferred(handler, *lines)
         return d
 
     def _delete_step(self, res, stepname):
@@ -337,6 +370,8 @@ class WorkQueue(object):
             d.addCallback(self.run_all_steps)
             return d
         return defer.succeed(None)
+    def flush(self):
+        return self.run_all_steps()
 
 
     def open_tempfile(self, filename):
@@ -353,8 +388,9 @@ class WorkQueue(object):
         pass
 
     def step_addpath(self, boxname, *path):
+        path = list(path)
         data = self.read_from_box(boxname)
-        child_node = unserialize(data) # TODO: unserialize ?
+        child_node = self._node_maker.make_node_from_serialized(data)
         return self.vdrive.add(path, child_node)
 
     def step_retain_ssk(self, index_a, read_key_a):