filetree: change the way addpath works, now we add workqueue steps for all involved...
authorBrian Warner <warner@lothar.com>
Mon, 22 Jan 2007 08:06:09 +0000 (01:06 -0700)
committerBrian Warner <warner@lothar.com>
Mon, 22 Jan 2007 08:06:09 +0000 (01:06 -0700)
src/allmydata/filetree/directory.py
src/allmydata/filetree/file.py
src/allmydata/filetree/interfaces.py
src/allmydata/filetree/redirect.py
src/allmydata/filetree/vdrive.py
src/allmydata/interfaces.py
src/allmydata/test/test_filetree_new.py
src/allmydata/test/test_workqueue.py
src/allmydata/workqueue.py

index 93a3f2915222e2b1455cc0149441dcdd4469c295..81287718555eb8a8373d2874f170a37c34bf2475 100644 (file)
@@ -180,6 +180,34 @@ class _DirectorySubTree(object):
                 break
         return (found_path, node, remaining_path)
 
+    def put_node_at_path(self, path, new_node):
+        assert len(path) > 0
+        child_name = path[-1]
+
+        # first step: get (or create) the parent directory
+        node = self.root
+        for subdir_name in path[:-1]:
+            # TODO: peeking at private attributes is naughty, but using
+            # node.get() and catching NoSuchChildError would be slightly
+            # ugly. Reconsider the IDirectoryNode.get() API.
+            childnode = node.children.get(subdir_name)
+            if childnode:
+                assert IDirectoryNode.providedBy(childnode)
+            else:
+                # we have to create new directories until the parent exists
+                childnode = node.add_subdir(subdir_name)
+            node = childnode
+
+        # 'node' is now pointing at the parent directory
+        if child_name in node.children:
+            # oops, there's already something there. We can replace it.
+            # TODO: How do we free the subtree that was just orphaned?
+            node.delete(child_name)
+
+        # now we can finally add the new node
+        node.add(child_name, new_node)
+
+
 class LocalFileSubTreeNode(BaseDataNode):
     prefix = "LocalFileDirectory"
 
@@ -212,6 +240,9 @@ class LocalFileSubTree(_DirectorySubTree):
         d.addCallback(self._populate_from_data, node_maker)
         return d
 
+    def mutation_modifies_parent(self):
+        return False
+
     def create_node_now(self):
         return LocalFileSubTreeNode().new(self.filename)
 
@@ -251,6 +282,10 @@ class CHKDirectorySubTree(_DirectorySubTree):
     # maybe mutable, maybe not
     node_class = CHKDirectorySubTreeNode
 
+    def new(self):
+        self.uri = None
+        return _DirectorySubTree.new(self)
+
     def set_uri(self, uri):
         self.uri = uri
 
@@ -259,8 +294,15 @@ class CHKDirectorySubTree(_DirectorySubTree):
         self.mutable = parent_is_mutable
         d = downloader.download(node.get_uri(), download.Data())
         d.addCallback(self._populate_from_data, node_maker)
+        def _populated(res):
+            self.uri = node.get_uri()
+            return self
+        d.addCallback(_populated)
         return d
 
+    def mutation_modifies_parent(self):
+        return True
+
     def create_node_now(self):
         return CHKDirectorySubTreeNode().new(self.uri)
 
@@ -286,7 +328,8 @@ class CHKDirectorySubTree(_DirectorySubTree):
         workqueue.add_delete_tempfile(filename)
         workqueue.add_retain_uri_from_box(boxname)
         workqueue.add_delete_box(boxname)
-        workqueue.add_unlink_uri(old_uri)
+        if old_uri:
+            workqueue.add_unlink_uri(old_uri)
         # TODO: think about how self.old_uri will get updated. I *think* that
         # this whole instance will get replaced, so it ought to be ok. But
         # this needs investigation.
@@ -339,6 +382,9 @@ class SSKDirectorySubTree(_DirectorySubTree):
     def set_version(self, version):
         self.version = version
 
+    def mutation_modifies_parent(self):
+        return False
+
     def create_node_now(self):
         node = SSKDirectorySubTreeNode()
         node.set_read_capability(self.read_capability)
index 8dced32ba49efbef6c04f302dcd9ab17f0183e85..25c9e05b6c8ae1be5df77c82346b08db4e9dcaef 100644 (file)
@@ -12,6 +12,9 @@ class CHKFileNode(BaseDataNode):
         self.uri = uri
         return self
 
+    def put_node_at_path(self, path, node):
+        raise RuntimeError
+
     def get_base_data(self):
         return self.uri
     def set_base_data(self, data):
@@ -27,6 +30,9 @@ class SSKFileNode(object):
     implements(INode, IFileNode)
     prefix = "SSKFile"
 
+    def put_node_at_path(self, path, node):
+        raise RuntimeError
+
     def serialize_node(self):
         data = (self.read_cap, self.write_cap)
         return "%s:%s" % (self.prefix, bencode.bencode(data))
index e39cd5c861fe715e9c9e9cc7f57bb1bab049616c..76465bb1a31d0e736965cf51e747f42c21788f31 100644 (file)
@@ -121,6 +121,13 @@ class ISubTree(Interface):
         IMutableSubTree to actually exercise these mutation rights.
         """
 
+    def mutation_modifies_parent():
+        """This returns True if any modification to this subtree will result
+        in it getting a new identity, and thus requiring its parent be
+        notified. This is True for CHKDirectorySubTree, but False for
+        SSKDirectorySubTree and all redirections.
+        """
+
     def get_node_for_path(path):
         """Ask this subtree to follow the path through its internal nodes.
 
@@ -152,6 +159,13 @@ class ISubTree(Interface):
 
         """
 
+    def put_node_at_path(path, node):
+        """Add the given node to this subtree, at 'path'.
+
+        This may create internal directory subnodes as necessary. This must
+        run synchronously, and returns None.
+        """
+
     def serialize_subtree_to_file(f):
         """Create a string which describes my structure and write it to the
         given filehandle (using only .write()). This string should be
@@ -204,6 +218,7 @@ class ISubTree(Interface):
         """
 
     def create_node_now():
+        # TODO: this is no longer just for testing.. vdrive.addpath needs it
         """FOR TESTING ONLY. Immediately create and return an INode which
         describes the current state of this subtree. This does not perform
         any upload or persistence work, and thus depends upon any internal
index 3e2f108663469f6d500f2b1ef78cefa168704b14..0a6fd8af72b13fee210b7e2b36c32c836bbd486c 100644 (file)
@@ -29,9 +29,16 @@ class _BaseRedirection(object):
         self.child_node = child_node
         return self
 
+    def mutation_modifies_parent(self):
+        return False
+
     def get_node_for_path(self, path):
         return ([], self.child_node, path)
 
+    def put_node_at_path(self, path, node):
+        assert path == []
+        self.child_node = node
+
     def serialize_subtree_to_file(self, f):
         f.write(self.child_node.serialize_node())
 
index 68b9fbfe9221ff5505ceb5ee23ba69474611b11a..a8494e158b21ca694c37cb8ee0116804c8a2d376 100644 (file)
@@ -84,6 +84,7 @@ class VirtualDrive(object):
         workqueue.set_vdrive(self)
         workqueue.set_uploader(uploader)
         self._downloader = downloader
+        self._uploader = uploader
         # TODO: queen?
         self.queen = None
         self.root_node = root_node
@@ -181,43 +182,112 @@ class VirtualDrive(object):
         d.addCallback(_got_closest)
         return d
 
-    def _get_subtree_path(self, path):
+    def get_subtrees_for_path(self, path):
         # compute a list of [(subtree1, subpath1), ...], which represents
         # which parts of 'path' traverse which subtrees. This can be used to
         # present the virtual drive to the user in a form that includes
         # redirection nodes (which do not consume path segments), or to
         # figure out which subtrees need to be updated when the identity of a
         # lower subtree (i.e. CHK) is changed.
-        pass # TODO
+
+        # TODO: it might be useful to add some items to the return value.
+        # Like if there is a node already present at that path, to return it.
+        d = self._get_root()
+        results = []
+        d.addCallback(self._get_subtrees_for_path_1, results, path)
+        return d
+
+    def _get_subtrees_for_path_1(self, subtree, results, path):
+        (found_path, node, remaining_path) = subtree.get_node_for_path(path)
+        if IDirectoryNode.providedBy(node):
+            # traversal done. We are looking at the final subtree, and the
+            # entire path (found_path + remaining_path) will live in here.
+            r = (subtree, (found_path + remaining_path))
+            results.append(r)
+            return results
+        if node.is_leaf_subtree():
+            # for this assert to fail, we found a File or something where we
+            # were expecting to find another subdirectory.
+            assert len(remaining_path) == 0
+            results.append((subtree, found_path))
+            return results
+        # otherwise we must open and recurse into a new subtree
+        results.append((subtree, found_path))
+        parent_is_mutable = subtree.is_mutable()
+        d = self.subtree_maker.make_subtree_from_node(node, parent_is_mutable)
+        def _opened(next_subtree):
+            next_subtree = ISubTree(next_subtree)
+            return self._get_subtrees_for_path_1(next_subtree, results,
+                                                 remaining_path)
+        d.addCallback(_opened)
+        return d
+
 
     # these are called by the workqueue
 
-    def add(self, path, new_node):
-        parent_path = path[:-1]
-        new_node_path = path[-1]
-        d = self._get_closest_node_and_prepath(parent_path)
-        def _got_closest((prepath, node, remaining_path)):
-            # now tell it to create any necessary parent directories
-            remaining_path = remaining_path[:]
-            while remaining_path:
-                node = node.add_subdir(remaining_path.pop(0))
-            # 'node' is now the directory where the child wants to go
-            return node, prepath
-        d.addCallback(_got_closest)
-        def _add_new_node((node, prepath)):
-            node.add(new_node_path, new_node)
-            subtree = node.get_subtree()
-            # now, tell the subtree to serialize and upload itself, using the
-            # workqueue.
-            boxname = subtree.update(self.workqueue)
-            if boxname:
-                # the parent needs to be notified, so queue a step to notify
-                # them (using 'prepath')
-                self.workqueue.add_addpath(boxname, prepath)
-            return self # TODO: what wold be the most useful?
-        d.addCallback(_add_new_node)
+    def addpath_with_node(self, path, new_node):
+        new_node_boxname = self.workqueue.create_boxname(new_node)
+        self.workqueue.add_delete_box(new_node_boxname)
+        return self.addpath(path, new_node_boxname)
+
+    def addpath(self, path, new_node_boxname):
+        # this adds a block of steps to the workqueue which, when complete,
+        # will result in the new_node existing in the virtual drive at
+        # 'path'.
+
+        # First we figure out which subtrees are involved
+        d = self.get_subtrees_for_path(path)
+
+        # then we walk through them from the bottom, arranging to modify them
+        # as necessary
+        def _got_subtrees(subtrees, new_node_boxname):
+            for (subtree, subpath) in reversed(subtrees):
+                assert subtree.is_mutable()
+                must_update = subtree.mutation_modifies_parent()
+                subtree_node = subtree.create_node_now()
+                new_subtree_boxname = None
+                if must_update:
+                    new_subtree_boxname = self.workqueue.create_boxname()
+                    self.workqueue.add_delete_box(new_subtree_boxname)
+                    self.workqueue.add_modify_subtree(subtree_node, subpath,
+                                                      new_node_boxname,
+                                                      new_subtree_boxname)
+                    # the box filled by the modify_subtree will be propagated
+                    # upwards
+                    new_node_boxname = new_subtree_boxname
+                else:
+                    # the buck stops here
+                    self.workqueue.add_modify_subtree(subtree_node, subpath,
+                                                      new_node_boxname)
+                    return
+        d.addCallback(_got_subtrees, new_node_boxname)
         return d
 
+    def modify_subtree(self, subtree_node, localpath, new_node,
+                       new_subtree_boxname=None):
+        # TODO: I'm lying here, we don't know who the parent is, so we can't
+        # really say whether they're mutable or not. But we're pretty sure
+        # that the new subtree is supposed to be mutable, because we asserted
+        # that earlier (although I suppose perhaps someone could change a
+        # QueenRedirection or an SSK file while we're offline in the middle
+        # of our workqueue..). Tell the new subtree that their parent is
+        # mutable so we can be sure it will believe that it itself is
+        # mutable.
+        parent_is_mutable = True
+        d = self.subtree_maker.make_subtree_from_node(subtree_node,
+                                                      parent_is_mutable)
+        def _got_subtree(subtree):
+            assert subtree.is_mutable()
+            subtree.put_node_at_path(localpath, new_node)
+            return subtree.update_now(self._uploader)
+        d.addCallback(_got_subtree)
+        if new_subtree_boxname:
+            d.addCallback(lambda new_subtree_node:
+                          self.workqueue.write_to_box(new_subtree_boxname,
+                                                      new_subtree_node))
+        return d
+
+
     # these are user-visible
 
     def list(self, path):
@@ -251,7 +321,7 @@ class VirtualDrive(object):
         uploadable = IUploadable(uploadable)
         d = self._child_should_not_exist(path)
         # then we upload the file
-        d.addCallback(lambda ignored: self.uploader.upload(uploadable))
+        d.addCallback(lambda ignored: self._uploader.upload(uploadable))
         def _uploaded(uri):
             assert isinstance(uri, str)
             new_node = file.CHKFileNode().new(uri)
index 04a6b3ab6e1f256084a43da83fc7f97e7598fdc2..d1cd48ea32e2f0d60fd0df1df15f400e7edf1d82 100644 (file)
@@ -308,8 +308,36 @@ class IWorkQueue(Interface):
         file."""
 
     def add_addpath(boxname, path):
-        """When executed, this step will retrieve the serialized INode from
-        the given box and call vdrive.add(path, node) .
+        """When executed, this step pulls a node specification from 'boxname'
+        and figures out which subtrees must be modified to allow that node to
+        live at the 'path' (which is an absolute path). This will probably
+        cause one or more 'add_modify_subtree' or 'add_modify_redirection'
+        steps to be added to the workqueue.
+        """
+
+    def add_modify_subtree(subtree_node, localpath, new_node_boxname,
+                           new_subtree_boxname=None):
+        """When executed, this step retrieves the subtree specified by
+        'subtree_node', pulls a node specification out of 'new_node_boxname',
+        then modifies it such that a subtree-relative 'localpath' points to
+        the new node. It then serializes the subtree in its new form, and
+        optionally puts a node that describes the new subtree in
+        'new_node_boxname'.
+
+        The idea is that 'subtree_node' will refer a CHKDirectorySubTree, and
+        'new_node_boxname' will contain the CHKFileNode that points to a
+        newly-uploaded file. When the CHKDirectorySubTree is modified, it
+        acquires a new URI, which will be stuffed (in the form of a
+        CHKDirectorySubTreeNode) into 'new_subtree_boxname'. A following step
+        would then read from 'new_subtree_boxname' and modify some other
+        subtree with the contents.
+
+        If 'subtree_node' refers to a redirection subtree like
+        LocalFileRedirection or QueenRedirection, then 'localpath' is
+        ignored, because redirection subtrees don't consume path components
+        and have no internal directory structure (they just have the one
+        redirection target). Redirection subtrees generally retain a constant
+        identity, so it is unlikely that 'new_subtree_boxname' will be used.
         """
 
     def add_unlink_uri(uri):
index 1edbd2b7c636b293aeae297ed97db9d64ae36c1d..7c6d05eed328a4e1db6a59f59d899d1cec7b14c5 100644 (file)
@@ -334,15 +334,26 @@ class InPairs(unittest.TestCase):
 
 class FakeMesh(object):
     implements(IDownloader, IUploader)
+    debug = False
 
     def __init__(self):
         self.files = {}
     def upload_filename(self, filename):
         uri = "stub-uri-%d" % len(self.files)
+        if self.debug:
+            print "FakeMesh.upload_filename(%s) -> %s" % (filename, uri)
         data = open(filename,"r").read()
         self.files[uri] = data
         return defer.succeed(uri)
+    def upload_data(self, data):
+        uri = "stub-uri-%d" % len(self.files)
+        if self.debug:
+            print "FakeMesh.upload_data(%s) -> %s" % (data, uri)
+        self.files[uri] = data
+        return defer.succeed(uri)
     def download(self, uri, target):
+        if self.debug:
+            print "FakeMesh.download(%s)" % uri
         target.open()
         target.write(self.files[uri])
         target.close()
@@ -351,9 +362,14 @@ class FakeMesh(object):
 
 class VDrive(unittest.TestCase):
 
-    def makeVirtualDrive(self, basedir, root_node=None):
+    def makeVirtualDrive(self, basedir, root_node=None, mesh=None):
         wq = workqueue.WorkQueue(os.path.join(basedir, "1.workqueue"))
-        dl = ul = FakeMesh()
+        if mesh:
+            assert IUploader.providedBy(mesh)
+            assert IDownloader.providedBy(mesh)
+            dl = ul = mesh
+        else:
+            dl = ul = FakeMesh()
         if not root_node:
             root_node = directory.LocalFileSubTreeNode()
             root_node.new("rootdirtree.save")
@@ -372,6 +388,22 @@ class VDrive(unittest.TestCase):
                                   root.create_node_now())
         return v
 
+    def makeCHKTree(self, basename):
+        # create a LocalFileRedirection pointing at a CHKDirectorySubTree.
+        # Returns a VirtualDrive instance.
+        mesh = FakeMesh()
+        topdir = directory.CHKDirectorySubTree().new()
+        d = topdir.update_now(mesh)
+        def _updated(topnode):
+            root = redirect.LocalFileRedirection()
+            root.new("%s-root" % basename, topnode)
+            return root.update_now(mesh)
+        d.addCallback(_updated)
+        d.addCallback(lambda rootnode:
+                      self.makeVirtualDrive("%s-vdrive" % basename,
+                                            rootnode, mesh))
+        return d
+
     def failUnlessListsAreEqual(self, list1, list2):
         self.failUnlessEqual(sorted(list1), sorted(list2))
 
@@ -545,3 +577,35 @@ class VDrive(unittest.TestCase):
 
         return d
 
+    def testCHKDirUpload(self):
+        DATA = "here is some data\n"
+        d = defer.maybeDeferred(self.makeCHKTree, "upload")
+        def _made(v):
+            self.v = v
+
+            filename = "upload1"
+            f = open(filename, "w")
+            f.write(DATA)
+            f.close()
+
+            rc = v.upload_later(["a","b","upload1"], filename)
+            self.failUnlessIdentical(rc, None)
+
+            return v.workqueue.flush()
+        d.addCallback(_made)
+
+        d.addCallback(lambda res: self.v.list([]))
+        d.addCallback(lambda contents:
+                      self.failUnlessListsAreEqual(contents.keys(), ["a"]))
+        d.addCallback(lambda res: self.v.list(["a"]))
+        d.addCallback(lambda contents:
+                      self.failUnlessListsAreEqual(contents.keys(), ["b"]))
+        d.addCallback(lambda res: self.v.list(["a","b"]))
+        d.addCallback(lambda contents:
+                      self.failUnlessListsAreEqual(contents.keys(),
+                                                   ["upload1"]))
+        d.addCallback(lambda res: self.v.download_as_data(["a","b","upload1"]))
+        d.addCallback(self.failUnlessEqual, DATA)
+
+        return d
+
index 872ee63671743f97811e0d24dec0d642cb77c90a..ab151a2c9b5ba899eab2f2138719965bc43695b5 100644 (file)
@@ -121,19 +121,19 @@ class Items(unittest.TestCase):
                                         ["box1"]))
         self.failUnlessEqual(steps[2], ("addpath",
                                         ["box1", "home", "warner", "foo.txt"]))
-        self.failUnlessEqual(steps[3], ("delete_box",
-                                        ["box1"]))
-        self.failUnlessEqual(steps[4],
+        self.failUnlessEqual(steps[3],
                              ("upload_chk",
                               [os.path.join(wq.filesdir, tmpfilename),
                                "box2"]))
-        self.failUnlessEqual(steps[5],
-                             ("delete_tempfile", [tmpfilename]))
-        self.failUnlessEqual(steps[6],
+        self.failUnlessEqual(steps[4],
                              ("retain_uri_from_box", ["box2"]))
-        self.failUnlessEqual(steps[7], ("delete_box", ["box2"]))
-        self.failUnlessEqual(steps[8], ("unlink_uri",
+        self.failUnlessEqual(steps[5], ("delete_box",
+                                        ["box1"]))
+        self.failUnlessEqual(steps[6], ("unlink_uri",
                                         ["olduri"]))
+        self.failUnlessEqual(steps[7],
+                             ("delete_tempfile", [tmpfilename]))
+        self.failUnlessEqual(steps[8], ("delete_box", ["box2"]))
         self.failUnlessEqual(steps[9], ("unlink_uri", ["oldchk"]))
 
     def testRun(self):
index eeff4b8d0a26d886c38893e3f4b48b8dca84a5d3..3b037c59e8f900e6effc6355b6ac4c66cc235d63 100644 (file)
@@ -48,6 +48,8 @@ class UploadSSKStep(Step):
 
 class WorkQueue(object):
     implements(IWorkQueue)
+    debug = False
+
     def __init__(self, basedir):
         assert basedir.endswith("workqueue")
         self.basedir = basedir
@@ -175,17 +177,31 @@ class WorkQueue(object):
         lines.extend(path)
         self._create_step_first(lines)
 
+    def add_modify_subtree(self, subtree_node, localpath, new_node_boxname,
+                           new_subtree_boxname=None):
+        assert isinstance(localpath, (list, tuple))
+        box1 = self.create_boxname(subtree_node)
+        self.add_delete_box(box1)
+        # TODO: it would probably be easier if steps were represented in
+        # directories, with a separate file for each argument
+        if new_subtree_boxname is None:
+            new_subtree_boxname = ""
+        lines = ["modify_subtree",
+                 box1, new_node_boxname, new_subtree_boxname]
+        lines.extend(localpath)
+        self._create_step_first(lines)
+
     def add_unlink_uri(self, uri):
         lines = ["unlink_uri", uri]
         self._create_step_last(lines)
 
     def add_delete_tempfile(self, filename):
         lines = ["delete_tempfile", filename]
-        self._create_step_first(lines)
+        self._create_step_last(lines)
 
     def add_delete_box(self, boxname):
         lines = ["delete_box", boxname]
-        self._create_step_first(lines)
+        self._create_step_last(lines)
 
 
     # methods to perform work
@@ -282,6 +298,9 @@ class WorkQueue(object):
 
 
     def step_upload_chk(self, source_filename, stash_uri_in_boxname):
+        if self.debug:
+            print "STEP_UPLOAD_CHK(%s -> %s)" % (source_filename,
+                                                 stash_uri_in_boxname)
         # we use relative filenames for tempfiles created by
         # workqueue.create_tempfile, and absolute filenames for everything
         # that comes from the vdrive. That means using os.path.abspath() on
@@ -289,6 +308,8 @@ class WorkQueue(object):
         filename = os.path.join(self.filesdir, source_filename)
         d = self._uploader.upload_filename(filename)
         def _uploaded(uri):
+            if self.debug:
+                print " -> %s" % uri
             node = CHKFileNode().new(uri)
             self.write_to_box(stash_uri_in_boxname, node)
         d.addCallback(_uploaded)
@@ -298,9 +319,21 @@ class WorkQueue(object):
         pass
 
     def step_addpath(self, boxname, *path):
+        if self.debug:
+            print "STEP_ADDPATH(%s -> %s)" % (boxname, "/".join(path))
         path = list(path)
-        child_node = self.read_from_box(boxname)
-        return self.vdrive.add(path, child_node)
+        return self.vdrive.addpath(path, boxname)
+    def step_modify_subtree(self, subtree_node_boxname, new_node_boxname,
+                            new_subtree_boxname, *localpath):
+        # the weird order of arguments is a consequence of the fact that
+        # localpath is variable-length and new_subtree_boxname is optional.
+        if not new_subtree_boxname:
+            new_subtree_boxname = None
+        subtree_node = self.read_from_box(subtree_node_boxname)
+        new_node = self.read_from_box(new_node_boxname)
+        localpath = list(localpath)
+        return self.vdrive.modify_subtree(subtree_node, localpath,
+                                          new_node, new_subtree_boxname)
 
     def step_retain_ssk(self, index_a, read_key_a):
         pass
@@ -309,11 +342,18 @@ class WorkQueue(object):
     def step_retain_uri_from_box(self, boxname):
         pass
     def step_unlink_uri(self, uri):
+        if self.debug:
+            print "STEP_UNLINK_URI(%s)" % uri
         pass
 
     def step_delete_tempfile(self, filename):
+        if self.debug:
+            print "STEP_DELETE_TEMPFILE(%s)" % filename
+        assert not filename.startswith("/")
         os.unlink(os.path.join(self.filesdir, filename))
     def step_delete_box(self, boxname):
+        if self.debug:
+            print "DELETE_BOX", boxname
         os.unlink(os.path.join(self.boxesdir, boxname))