From: Brian Warner Date: Mon, 22 Jan 2007 08:06:09 +0000 (-0700) Subject: filetree: change the way addpath works, now we add workqueue steps for all involved... X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~332 X-Git-Url: https://git.rkrishnan.org/zeppelin?a=commitdiff_plain;h=22731125f32bda77dd347b052ad0e9f24ed4900f;p=tahoe-lafs%2Ftahoe-lafs.git filetree: change the way addpath works, now we add workqueue steps for all involved subtrees at about the same time, rather than letting one step add the next when it runs. Finally add a (passing) test for uploading files to CHK-based directories --- diff --git a/src/allmydata/filetree/directory.py b/src/allmydata/filetree/directory.py index 93a3f291..81287718 100644 --- a/src/allmydata/filetree/directory.py +++ b/src/allmydata/filetree/directory.py @@ -180,6 +180,34 @@ class _DirectorySubTree(object): break return (found_path, node, remaining_path) + def put_node_at_path(self, path, new_node): + assert len(path) > 0 + child_name = path[-1] + + # first step: get (or create) the parent directory + node = self.root + for subdir_name in path[:-1]: + # TODO: peeking at private attributes is naughty, but using + # node.get() and catching NoSuchChildError would be slightly + # ugly. Reconsider the IDirectoryNode.get() API. + childnode = node.children.get(subdir_name) + if childnode: + assert IDirectoryNode.providedBy(childnode) + else: + # we have to create new directories until the parent exists + childnode = node.add_subdir(subdir_name) + node = childnode + + # 'node' is now pointing at the parent directory + if child_name in node.children: + # oops, there's already something there. We can replace it. + # TODO: How do we free the subtree that was just orphaned? + node.delete(child_name) + + # now we can finally add the new node + node.add(child_name, new_node) + + class LocalFileSubTreeNode(BaseDataNode): prefix = "LocalFileDirectory" @@ -212,6 +240,9 @@ class LocalFileSubTree(_DirectorySubTree): d.addCallback(self._populate_from_data, node_maker) return d + def mutation_modifies_parent(self): + return False + def create_node_now(self): return LocalFileSubTreeNode().new(self.filename) @@ -251,6 +282,10 @@ class CHKDirectorySubTree(_DirectorySubTree): # maybe mutable, maybe not node_class = CHKDirectorySubTreeNode + def new(self): + self.uri = None + return _DirectorySubTree.new(self) + def set_uri(self, uri): self.uri = uri @@ -259,8 +294,15 @@ class CHKDirectorySubTree(_DirectorySubTree): self.mutable = parent_is_mutable d = downloader.download(node.get_uri(), download.Data()) d.addCallback(self._populate_from_data, node_maker) + def _populated(res): + self.uri = node.get_uri() + return self + d.addCallback(_populated) return d + def mutation_modifies_parent(self): + return True + def create_node_now(self): return CHKDirectorySubTreeNode().new(self.uri) @@ -286,7 +328,8 @@ class CHKDirectorySubTree(_DirectorySubTree): workqueue.add_delete_tempfile(filename) workqueue.add_retain_uri_from_box(boxname) workqueue.add_delete_box(boxname) - workqueue.add_unlink_uri(old_uri) + if old_uri: + workqueue.add_unlink_uri(old_uri) # TODO: think about how self.old_uri will get updated. I *think* that # this whole instance will get replaced, so it ought to be ok. But # this needs investigation. @@ -339,6 +382,9 @@ class SSKDirectorySubTree(_DirectorySubTree): def set_version(self, version): self.version = version + def mutation_modifies_parent(self): + return False + def create_node_now(self): node = SSKDirectorySubTreeNode() node.set_read_capability(self.read_capability) diff --git a/src/allmydata/filetree/file.py b/src/allmydata/filetree/file.py index 8dced32b..25c9e05b 100644 --- a/src/allmydata/filetree/file.py +++ b/src/allmydata/filetree/file.py @@ -12,6 +12,9 @@ class CHKFileNode(BaseDataNode): self.uri = uri return self + def put_node_at_path(self, path, node): + raise RuntimeError + def get_base_data(self): return self.uri def set_base_data(self, data): @@ -27,6 +30,9 @@ class SSKFileNode(object): implements(INode, IFileNode) prefix = "SSKFile" + def put_node_at_path(self, path, node): + raise RuntimeError + def serialize_node(self): data = (self.read_cap, self.write_cap) return "%s:%s" % (self.prefix, bencode.bencode(data)) diff --git a/src/allmydata/filetree/interfaces.py b/src/allmydata/filetree/interfaces.py index e39cd5c8..76465bb1 100644 --- a/src/allmydata/filetree/interfaces.py +++ b/src/allmydata/filetree/interfaces.py @@ -121,6 +121,13 @@ class ISubTree(Interface): IMutableSubTree to actually exercise these mutation rights. """ + def mutation_modifies_parent(): + """This returns True if any modification to this subtree will result + in it getting a new identity, and thus requiring its parent be + notified. This is True for CHKDirectorySubTree, but False for + SSKDirectorySubTree and all redirections. + """ + def get_node_for_path(path): """Ask this subtree to follow the path through its internal nodes. @@ -152,6 +159,13 @@ class ISubTree(Interface): """ + def put_node_at_path(path, node): + """Add the given node to this subtree, at 'path'. + + This may create internal directory subnodes as necessary. This must + run synchronously, and returns None. + """ + def serialize_subtree_to_file(f): """Create a string which describes my structure and write it to the given filehandle (using only .write()). This string should be @@ -204,6 +218,7 @@ class ISubTree(Interface): """ def create_node_now(): + # TODO: this is no longer just for testing.. vdrive.addpath needs it """FOR TESTING ONLY. Immediately create and return an INode which describes the current state of this subtree. This does not perform any upload or persistence work, and thus depends upon any internal diff --git a/src/allmydata/filetree/redirect.py b/src/allmydata/filetree/redirect.py index 3e2f1086..0a6fd8af 100644 --- a/src/allmydata/filetree/redirect.py +++ b/src/allmydata/filetree/redirect.py @@ -29,9 +29,16 @@ class _BaseRedirection(object): self.child_node = child_node return self + def mutation_modifies_parent(self): + return False + def get_node_for_path(self, path): return ([], self.child_node, path) + def put_node_at_path(self, path, node): + assert path == [] + self.child_node = node + def serialize_subtree_to_file(self, f): f.write(self.child_node.serialize_node()) diff --git a/src/allmydata/filetree/vdrive.py b/src/allmydata/filetree/vdrive.py index 68b9fbfe..a8494e15 100644 --- a/src/allmydata/filetree/vdrive.py +++ b/src/allmydata/filetree/vdrive.py @@ -84,6 +84,7 @@ class VirtualDrive(object): workqueue.set_vdrive(self) workqueue.set_uploader(uploader) self._downloader = downloader + self._uploader = uploader # TODO: queen? self.queen = None self.root_node = root_node @@ -181,43 +182,112 @@ class VirtualDrive(object): d.addCallback(_got_closest) return d - def _get_subtree_path(self, path): + def get_subtrees_for_path(self, path): # compute a list of [(subtree1, subpath1), ...], which represents # which parts of 'path' traverse which subtrees. This can be used to # present the virtual drive to the user in a form that includes # redirection nodes (which do not consume path segments), or to # figure out which subtrees need to be updated when the identity of a # lower subtree (i.e. CHK) is changed. - pass # TODO + + # TODO: it might be useful to add some items to the return value. + # Like if there is a node already present at that path, to return it. + d = self._get_root() + results = [] + d.addCallback(self._get_subtrees_for_path_1, results, path) + return d + + def _get_subtrees_for_path_1(self, subtree, results, path): + (found_path, node, remaining_path) = subtree.get_node_for_path(path) + if IDirectoryNode.providedBy(node): + # traversal done. We are looking at the final subtree, and the + # entire path (found_path + remaining_path) will live in here. + r = (subtree, (found_path + remaining_path)) + results.append(r) + return results + if node.is_leaf_subtree(): + # for this assert to fail, we found a File or something where we + # were expecting to find another subdirectory. + assert len(remaining_path) == 0 + results.append((subtree, found_path)) + return results + # otherwise we must open and recurse into a new subtree + results.append((subtree, found_path)) + parent_is_mutable = subtree.is_mutable() + d = self.subtree_maker.make_subtree_from_node(node, parent_is_mutable) + def _opened(next_subtree): + next_subtree = ISubTree(next_subtree) + return self._get_subtrees_for_path_1(next_subtree, results, + remaining_path) + d.addCallback(_opened) + return d + # these are called by the workqueue - def add(self, path, new_node): - parent_path = path[:-1] - new_node_path = path[-1] - d = self._get_closest_node_and_prepath(parent_path) - def _got_closest((prepath, node, remaining_path)): - # now tell it to create any necessary parent directories - remaining_path = remaining_path[:] - while remaining_path: - node = node.add_subdir(remaining_path.pop(0)) - # 'node' is now the directory where the child wants to go - return node, prepath - d.addCallback(_got_closest) - def _add_new_node((node, prepath)): - node.add(new_node_path, new_node) - subtree = node.get_subtree() - # now, tell the subtree to serialize and upload itself, using the - # workqueue. - boxname = subtree.update(self.workqueue) - if boxname: - # the parent needs to be notified, so queue a step to notify - # them (using 'prepath') - self.workqueue.add_addpath(boxname, prepath) - return self # TODO: what wold be the most useful? - d.addCallback(_add_new_node) + def addpath_with_node(self, path, new_node): + new_node_boxname = self.workqueue.create_boxname(new_node) + self.workqueue.add_delete_box(new_node_boxname) + return self.addpath(path, new_node_boxname) + + def addpath(self, path, new_node_boxname): + # this adds a block of steps to the workqueue which, when complete, + # will result in the new_node existing in the virtual drive at + # 'path'. + + # First we figure out which subtrees are involved + d = self.get_subtrees_for_path(path) + + # then we walk through them from the bottom, arranging to modify them + # as necessary + def _got_subtrees(subtrees, new_node_boxname): + for (subtree, subpath) in reversed(subtrees): + assert subtree.is_mutable() + must_update = subtree.mutation_modifies_parent() + subtree_node = subtree.create_node_now() + new_subtree_boxname = None + if must_update: + new_subtree_boxname = self.workqueue.create_boxname() + self.workqueue.add_delete_box(new_subtree_boxname) + self.workqueue.add_modify_subtree(subtree_node, subpath, + new_node_boxname, + new_subtree_boxname) + # the box filled by the modify_subtree will be propagated + # upwards + new_node_boxname = new_subtree_boxname + else: + # the buck stops here + self.workqueue.add_modify_subtree(subtree_node, subpath, + new_node_boxname) + return + d.addCallback(_got_subtrees, new_node_boxname) return d + def modify_subtree(self, subtree_node, localpath, new_node, + new_subtree_boxname=None): + # TODO: I'm lying here, we don't know who the parent is, so we can't + # really say whether they're mutable or not. But we're pretty sure + # that the new subtree is supposed to be mutable, because we asserted + # that earlier (although I suppose perhaps someone could change a + # QueenRedirection or an SSK file while we're offline in the middle + # of our workqueue..). Tell the new subtree that their parent is + # mutable so we can be sure it will believe that it itself is + # mutable. + parent_is_mutable = True + d = self.subtree_maker.make_subtree_from_node(subtree_node, + parent_is_mutable) + def _got_subtree(subtree): + assert subtree.is_mutable() + subtree.put_node_at_path(localpath, new_node) + return subtree.update_now(self._uploader) + d.addCallback(_got_subtree) + if new_subtree_boxname: + d.addCallback(lambda new_subtree_node: + self.workqueue.write_to_box(new_subtree_boxname, + new_subtree_node)) + return d + + # these are user-visible def list(self, path): @@ -251,7 +321,7 @@ class VirtualDrive(object): uploadable = IUploadable(uploadable) d = self._child_should_not_exist(path) # then we upload the file - d.addCallback(lambda ignored: self.uploader.upload(uploadable)) + d.addCallback(lambda ignored: self._uploader.upload(uploadable)) def _uploaded(uri): assert isinstance(uri, str) new_node = file.CHKFileNode().new(uri) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 04a6b3ab..d1cd48ea 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -308,8 +308,36 @@ class IWorkQueue(Interface): file.""" def add_addpath(boxname, path): - """When executed, this step will retrieve the serialized INode from - the given box and call vdrive.add(path, node) . + """When executed, this step pulls a node specification from 'boxname' + and figures out which subtrees must be modified to allow that node to + live at the 'path' (which is an absolute path). This will probably + cause one or more 'add_modify_subtree' or 'add_modify_redirection' + steps to be added to the workqueue. + """ + + def add_modify_subtree(subtree_node, localpath, new_node_boxname, + new_subtree_boxname=None): + """When executed, this step retrieves the subtree specified by + 'subtree_node', pulls a node specification out of 'new_node_boxname', + then modifies it such that a subtree-relative 'localpath' points to + the new node. It then serializes the subtree in its new form, and + optionally puts a node that describes the new subtree in + 'new_node_boxname'. + + The idea is that 'subtree_node' will refer a CHKDirectorySubTree, and + 'new_node_boxname' will contain the CHKFileNode that points to a + newly-uploaded file. When the CHKDirectorySubTree is modified, it + acquires a new URI, which will be stuffed (in the form of a + CHKDirectorySubTreeNode) into 'new_subtree_boxname'. A following step + would then read from 'new_subtree_boxname' and modify some other + subtree with the contents. + + If 'subtree_node' refers to a redirection subtree like + LocalFileRedirection or QueenRedirection, then 'localpath' is + ignored, because redirection subtrees don't consume path components + and have no internal directory structure (they just have the one + redirection target). Redirection subtrees generally retain a constant + identity, so it is unlikely that 'new_subtree_boxname' will be used. """ def add_unlink_uri(uri): diff --git a/src/allmydata/test/test_filetree_new.py b/src/allmydata/test/test_filetree_new.py index 1edbd2b7..7c6d05ee 100644 --- a/src/allmydata/test/test_filetree_new.py +++ b/src/allmydata/test/test_filetree_new.py @@ -334,15 +334,26 @@ class InPairs(unittest.TestCase): class FakeMesh(object): implements(IDownloader, IUploader) + debug = False def __init__(self): self.files = {} def upload_filename(self, filename): uri = "stub-uri-%d" % len(self.files) + if self.debug: + print "FakeMesh.upload_filename(%s) -> %s" % (filename, uri) data = open(filename,"r").read() self.files[uri] = data return defer.succeed(uri) + def upload_data(self, data): + uri = "stub-uri-%d" % len(self.files) + if self.debug: + print "FakeMesh.upload_data(%s) -> %s" % (data, uri) + self.files[uri] = data + return defer.succeed(uri) def download(self, uri, target): + if self.debug: + print "FakeMesh.download(%s)" % uri target.open() target.write(self.files[uri]) target.close() @@ -351,9 +362,14 @@ class FakeMesh(object): class VDrive(unittest.TestCase): - def makeVirtualDrive(self, basedir, root_node=None): + def makeVirtualDrive(self, basedir, root_node=None, mesh=None): wq = workqueue.WorkQueue(os.path.join(basedir, "1.workqueue")) - dl = ul = FakeMesh() + if mesh: + assert IUploader.providedBy(mesh) + assert IDownloader.providedBy(mesh) + dl = ul = mesh + else: + dl = ul = FakeMesh() if not root_node: root_node = directory.LocalFileSubTreeNode() root_node.new("rootdirtree.save") @@ -372,6 +388,22 @@ class VDrive(unittest.TestCase): root.create_node_now()) return v + def makeCHKTree(self, basename): + # create a LocalFileRedirection pointing at a CHKDirectorySubTree. + # Returns a VirtualDrive instance. + mesh = FakeMesh() + topdir = directory.CHKDirectorySubTree().new() + d = topdir.update_now(mesh) + def _updated(topnode): + root = redirect.LocalFileRedirection() + root.new("%s-root" % basename, topnode) + return root.update_now(mesh) + d.addCallback(_updated) + d.addCallback(lambda rootnode: + self.makeVirtualDrive("%s-vdrive" % basename, + rootnode, mesh)) + return d + def failUnlessListsAreEqual(self, list1, list2): self.failUnlessEqual(sorted(list1), sorted(list2)) @@ -545,3 +577,35 @@ class VDrive(unittest.TestCase): return d + def testCHKDirUpload(self): + DATA = "here is some data\n" + d = defer.maybeDeferred(self.makeCHKTree, "upload") + def _made(v): + self.v = v + + filename = "upload1" + f = open(filename, "w") + f.write(DATA) + f.close() + + rc = v.upload_later(["a","b","upload1"], filename) + self.failUnlessIdentical(rc, None) + + return v.workqueue.flush() + d.addCallback(_made) + + d.addCallback(lambda res: self.v.list([])) + d.addCallback(lambda contents: + self.failUnlessListsAreEqual(contents.keys(), ["a"])) + d.addCallback(lambda res: self.v.list(["a"])) + d.addCallback(lambda contents: + self.failUnlessListsAreEqual(contents.keys(), ["b"])) + d.addCallback(lambda res: self.v.list(["a","b"])) + d.addCallback(lambda contents: + self.failUnlessListsAreEqual(contents.keys(), + ["upload1"])) + d.addCallback(lambda res: self.v.download_as_data(["a","b","upload1"])) + d.addCallback(self.failUnlessEqual, DATA) + + return d + diff --git a/src/allmydata/test/test_workqueue.py b/src/allmydata/test/test_workqueue.py index 872ee636..ab151a2c 100644 --- a/src/allmydata/test/test_workqueue.py +++ b/src/allmydata/test/test_workqueue.py @@ -121,19 +121,19 @@ class Items(unittest.TestCase): ["box1"])) self.failUnlessEqual(steps[2], ("addpath", ["box1", "home", "warner", "foo.txt"])) - self.failUnlessEqual(steps[3], ("delete_box", - ["box1"])) - self.failUnlessEqual(steps[4], + self.failUnlessEqual(steps[3], ("upload_chk", [os.path.join(wq.filesdir, tmpfilename), "box2"])) - self.failUnlessEqual(steps[5], - ("delete_tempfile", [tmpfilename])) - self.failUnlessEqual(steps[6], + self.failUnlessEqual(steps[4], ("retain_uri_from_box", ["box2"])) - self.failUnlessEqual(steps[7], ("delete_box", ["box2"])) - self.failUnlessEqual(steps[8], ("unlink_uri", + self.failUnlessEqual(steps[5], ("delete_box", + ["box1"])) + self.failUnlessEqual(steps[6], ("unlink_uri", ["olduri"])) + self.failUnlessEqual(steps[7], + ("delete_tempfile", [tmpfilename])) + self.failUnlessEqual(steps[8], ("delete_box", ["box2"])) self.failUnlessEqual(steps[9], ("unlink_uri", ["oldchk"])) def testRun(self): diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py index eeff4b8d..3b037c59 100644 --- a/src/allmydata/workqueue.py +++ b/src/allmydata/workqueue.py @@ -48,6 +48,8 @@ class UploadSSKStep(Step): class WorkQueue(object): implements(IWorkQueue) + debug = False + def __init__(self, basedir): assert basedir.endswith("workqueue") self.basedir = basedir @@ -175,17 +177,31 @@ class WorkQueue(object): lines.extend(path) self._create_step_first(lines) + def add_modify_subtree(self, subtree_node, localpath, new_node_boxname, + new_subtree_boxname=None): + assert isinstance(localpath, (list, tuple)) + box1 = self.create_boxname(subtree_node) + self.add_delete_box(box1) + # TODO: it would probably be easier if steps were represented in + # directories, with a separate file for each argument + if new_subtree_boxname is None: + new_subtree_boxname = "" + lines = ["modify_subtree", + box1, new_node_boxname, new_subtree_boxname] + lines.extend(localpath) + self._create_step_first(lines) + def add_unlink_uri(self, uri): lines = ["unlink_uri", uri] self._create_step_last(lines) def add_delete_tempfile(self, filename): lines = ["delete_tempfile", filename] - self._create_step_first(lines) + self._create_step_last(lines) def add_delete_box(self, boxname): lines = ["delete_box", boxname] - self._create_step_first(lines) + self._create_step_last(lines) # methods to perform work @@ -282,6 +298,9 @@ class WorkQueue(object): def step_upload_chk(self, source_filename, stash_uri_in_boxname): + if self.debug: + print "STEP_UPLOAD_CHK(%s -> %s)" % (source_filename, + stash_uri_in_boxname) # we use relative filenames for tempfiles created by # workqueue.create_tempfile, and absolute filenames for everything # that comes from the vdrive. That means using os.path.abspath() on @@ -289,6 +308,8 @@ class WorkQueue(object): filename = os.path.join(self.filesdir, source_filename) d = self._uploader.upload_filename(filename) def _uploaded(uri): + if self.debug: + print " -> %s" % uri node = CHKFileNode().new(uri) self.write_to_box(stash_uri_in_boxname, node) d.addCallback(_uploaded) @@ -298,9 +319,21 @@ class WorkQueue(object): pass def step_addpath(self, boxname, *path): + if self.debug: + print "STEP_ADDPATH(%s -> %s)" % (boxname, "/".join(path)) path = list(path) - child_node = self.read_from_box(boxname) - return self.vdrive.add(path, child_node) + return self.vdrive.addpath(path, boxname) + def step_modify_subtree(self, subtree_node_boxname, new_node_boxname, + new_subtree_boxname, *localpath): + # the weird order of arguments is a consequence of the fact that + # localpath is variable-length and new_subtree_boxname is optional. + if not new_subtree_boxname: + new_subtree_boxname = None + subtree_node = self.read_from_box(subtree_node_boxname) + new_node = self.read_from_box(new_node_boxname) + localpath = list(localpath) + return self.vdrive.modify_subtree(subtree_node, localpath, + new_node, new_subtree_boxname) def step_retain_ssk(self, index_a, read_key_a): pass @@ -309,11 +342,18 @@ class WorkQueue(object): def step_retain_uri_from_box(self, boxname): pass def step_unlink_uri(self, uri): + if self.debug: + print "STEP_UNLINK_URI(%s)" % uri pass def step_delete_tempfile(self, filename): + if self.debug: + print "STEP_DELETE_TEMPFILE(%s)" % filename + assert not filename.startswith("/") os.unlink(os.path.join(self.filesdir, filename)) def step_delete_box(self, boxname): + if self.debug: + print "DELETE_BOX", boxname os.unlink(os.path.join(self.boxesdir, boxname))