From 7b8c524d7c69725e6ef4ac63bda3e7892ee6dfea Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 21 Jan 2007 04:18:54 -0700 Subject: [PATCH] more filetree, workqueue-boxes now hold serialized Nodes, move NodeMaker out to a separate module --- src/allmydata/filetree/file.py | 1 + src/allmydata/filetree/interfaces.py | 8 ++- src/allmydata/filetree/nodemaker.py | 41 +++++++++++++++ src/allmydata/filetree/vdrive.py | 66 ++++++++++--------------- src/allmydata/test/test_filetree_new.py | 34 ++++++++++++- src/allmydata/test/test_workqueue.py | 2 +- src/allmydata/workqueue.py | 46 +++++++++++++++-- 7 files changed, 147 insertions(+), 51 deletions(-) create mode 100644 src/allmydata/filetree/nodemaker.py diff --git a/src/allmydata/filetree/file.py b/src/allmydata/filetree/file.py index 6862cee8..e08f2f9a 100644 --- a/src/allmydata/filetree/file.py +++ b/src/allmydata/filetree/file.py @@ -10,6 +10,7 @@ class CHKFileNode(BaseDataNode): def new(self, uri): self.uri = uri + return self def get_base_data(self): return self.uri diff --git a/src/allmydata/filetree/interfaces.py b/src/allmydata/filetree/interfaces.py index dd799018..56079272 100644 --- a/src/allmydata/filetree/interfaces.py +++ b/src/allmydata/filetree/interfaces.py @@ -245,11 +245,11 @@ class IVirtualDrive(Interface): path[:-1] must refer to a writable DIRECTORY node. 'uploadable' must implement IUploadable. This returns a Deferred that fires (with - 'uploadable') when the upload is complete. + 'uploadable') when the upload is complete. Do not use the workqueue. """ def upload_later(path, filename): - """Upload a file from disk to the given path. + """Upload a file from disk to the given path. Use the workqueue. """ def delete(path): @@ -259,6 +259,10 @@ class IVirtualDrive(Interface): complete. """ + def add_node(path, node): + """Add a node to the given path. Use the workqueue. + """ + # commands to manipulate subtrees # ... detach subtree, merge subtree, etc diff --git a/src/allmydata/filetree/nodemaker.py b/src/allmydata/filetree/nodemaker.py new file mode 100644 index 00000000..d38458d8 --- /dev/null +++ b/src/allmydata/filetree/nodemaker.py @@ -0,0 +1,41 @@ + +from zope.interface import implements +from allmydata.filetree import directory, file, redirect +from allmydata.filetree.interfaces import INodeMaker + +# this list is used by NodeMaker to convert node specification strings (found +# inside the serialized form of subtrees) into Nodes (which live in the +# in-RAM form of subtrees). +all_node_types = [ + directory.LocalFileSubTreeNode, + directory.CHKDirectorySubTreeNode, + directory.SSKDirectorySubTreeNode, + file.CHKFileNode, + file.SSKFileNode, + redirect.LocalFileRedirectionNode, + redirect.QueenRedirectionNode, + redirect.HTTPRedirectionNode, + redirect.QueenOrLocalFileRedirectionNode, +] + +class NodeMaker(object): + implements(INodeMaker) + + def make_node_from_serialized(self, serialized): + # this turns a string into an INode, which contains information about + # the file or directory (like a URI), but does not contain the actual + # contents. An ISubTreeMaker can be used later to retrieve the + # contents (which means downloading the file if this is an IFileNode, + # or perhaps creating a new subtree from the contents) + + # maybe include parent_is_mutable? + assert isinstance(serialized, str) + prefix, body = serialized.split(":", 2) + + for node_class in all_node_types: + if prefix == node_class.prefix: + node = node_class() + node.populate_node(body, self) + return node + raise RuntimeError("unable to handle node type '%s'" % prefix) + diff --git a/src/allmydata/filetree/vdrive.py b/src/allmydata/filetree/vdrive.py index 63e7271f..8a44f343 100644 --- a/src/allmydata/filetree/vdrive.py +++ b/src/allmydata/filetree/vdrive.py @@ -10,43 +10,7 @@ from allmydata.filetree.interfaces import ( ) from allmydata.upload import IUploadable -# this list is used by NodeMaker to convert node specification strings (found -# inside the serialized form of subtrees) into Nodes (which live in the -# in-RAM form of subtrees). -all_node_types = [ - directory.LocalFileSubTreeNode, - directory.CHKDirectorySubTreeNode, - directory.SSKDirectorySubTreeNode, - file.CHKFileNode, - file.SSKFileNode, - redirect.LocalFileRedirectionNode, - redirect.QueenRedirectionNode, - redirect.HTTPRedirectionNode, - redirect.QueenOrLocalFileRedirectionNode, -] - -class NodeMaker(object): - implements(INodeMaker) - - def make_node_from_serialized(self, serialized): - # this turns a string into an INode, which contains information about - # the file or directory (like a URI), but does not contain the actual - # contents. An ISubTreeMaker can be used later to retrieve the - # contents (which means downloading the file if this is an IFileNode, - # or perhaps creating a new subtree from the contents) - - # maybe include parent_is_mutable? - assert isinstance(serialized, str) - prefix, body = serialized.split(":", 2) - - for node_class in all_node_types: - if prefix == node_class.prefix: - node = node_class() - node.populate_node(body, self) - return node - raise RuntimeError("unable to handle node type '%s'" % prefix) - - +from allmydata.filetree.nodemaker import NodeMaker all_openable_subtree_types = [ directory.LocalFileSubTree, @@ -204,7 +168,7 @@ class VirtualDrive(object): def _got_closest((node, remaining_path)): prepath_len = len(path) - len(remaining_path) prepath = path[:prepath_len] - assert path[prepath_len:] == remaining_path + assert path[prepath_len:] == remaining_path, "um, path=%s, prepath=%s, prepath_len=%d, remaining_path=%s" % (path, prepath, prepath_len, remaining_path) return (prepath, node, remaining_path) d.addCallback(_got_closest) return d @@ -226,6 +190,7 @@ class VirtualDrive(object): d = self._get_closest_node_and_prepath(parent_path) def _got_closest((prepath, node, remaining_path)): # now tell it to create any necessary parent directories + remaining_path = remaining_path[:] while remaining_path: node = node.add_subdir(remaining_path.pop(0)) # 'node' is now the directory where the child wants to go @@ -248,16 +213,19 @@ class VirtualDrive(object): # these are user-visible def list(self, path): + assert isinstance(path, list) d = self._get_directory(path) d.addCallback(lambda node: node.list()) return d def download(self, path, target): + assert isinstance(path, list) d = self._get_file_uri(path) d.addCallback(lambda uri: self.downloader.download(uri, target)) return d def upload_now(self, path, uploadable): + assert isinstance(path, list) # note: the first few steps of this do not use the workqueue, but I # think things should remain consistent anyways. If the node is shut # down before the file has finished uploading, then we forget all @@ -266,17 +234,24 @@ class VirtualDrive(object): d = self._child_should_not_exist(path) # then we upload the file d.addCallback(lambda ignored: self.uploader.upload(uploadable)) - d.addCallback(lambda uri: self.workqueue.create_boxname(uri)) - d.addCallback(lambda boxname: - self.workqueue.add_addpath(boxname, path)) + def _uploaded(uri): + assert isinstance(uri, str) + new_node = file.CHKFileNode().new(uri) + boxname = self.workqueue.create_boxname(new_node) + self.workqueue.add_addpath(boxname, path) + self.workqueue.add_delete_box(boxname) + d.addCallback(_uploaded) return d def upload_later(self, path, filename): + assert isinstance(path, list) boxname = self.workqueue.create_boxname() self.workqueue.add_upload_chk(filename, boxname) self.workqueue.add_addpath(boxname, path) + self.workqueue.add_delete_box(boxname) def delete(self, path): + assert isinstance(path, list) parent_path = path[:-1] orphan_path = path[-1] d = self._get_closest_node_and_prepath(parent_path) @@ -288,7 +263,16 @@ class VirtualDrive(object): boxname = subtree.update(self.workqueue) if boxname: self.workqueue.add_addpath(boxname, prepath) + self.workqueue.add_delete_box(boxname) return self d.addCallback(_got_parent) return d + def add_node(self, path, node): + assert isinstance(path, list) + assert INode(node) + assert not IDirectoryNode.providedBy(node) + boxname = self.workqueue.create_boxname(node) + self.workqueue.add_addpath(boxname, path) + self.workqueue.add_delete_box(boxname) + diff --git a/src/allmydata/test/test_filetree_new.py b/src/allmydata/test/test_filetree_new.py index 6a94e1b5..8cc3d2bb 100644 --- a/src/allmydata/test/test_filetree_new.py +++ b/src/allmydata/test/test_filetree_new.py @@ -338,6 +338,11 @@ class Stuff(unittest.TestCase): def failUnlessListsAreEqual(self, list1, list2): self.failUnlessEqual(sorted(list1), sorted(list2)) + def failUnlessContentsAreEqual(self, c1, c2): + c1a = dict([(k,v.serialize_node()) for k,v in c1.items()]) + c2a = dict([(k,v.serialize_node()) for k,v in c2.items()]) + self.failUnlessEqual(c1a, c2a) + def testDirectory(self): stm = vdrive.SubTreeMaker(None, None) @@ -431,11 +436,36 @@ class Stuff(unittest.TestCase): root = redirect.LocalFileRedirection().new("vdrive-root", topdir.create_node_now()) root.update_now(None) - wq = self.makeVirtualDrive("vdrive", root.create_node_now()) + v = self.makeVirtualDrive("vdrive", root.create_node_now()) - d = wq.list([]) + d = v.list([]) def _listed(contents): self.failUnlessEqual(contents, {}) d.addCallback(_listed) + + child1 = CHKFileNode().new("uri1") + d.addCallback(lambda res: v.add_node(["a"], child1)) + d.addCallback(lambda res: v.workqueue.flush()) + d.addCallback(lambda res: v.list([])) + def _listed2(contents): + self.failUnlessListsAreEqual(contents.keys(), ["a"]) + self.failUnlessContentsAreEqual(contents, {"a": child1}) + d.addCallback(_listed2) + child2 = CHKFileNode().new("uri2") + child3 = CHKFileNode().new("uri3") + d.addCallback(lambda res: v.add_node(["b","c"], child2)) + d.addCallback(lambda res: v.add_node(["b","d"], child3)) + d.addCallback(lambda res: v.workqueue.flush()) + d.addCallback(lambda res: v.list([])) + def _listed3(contents): + self.failUnlessListsAreEqual(contents.keys(), ["a","b"]) + d.addCallback(_listed3) + d.addCallback(lambda res: v.list(["b"])) + def _listed4(contents): + self.failUnlessListsAreEqual(contents.keys(), ["c","d"]) + self.failUnlessContentsAreEqual(contents, + {"c": child2, "d": child3}) + d.addCallback(_listed4) + return d diff --git a/src/allmydata/test/test_workqueue.py b/src/allmydata/test/test_workqueue.py index b2b20307..d0c01c51 100644 --- a/src/allmydata/test/test_workqueue.py +++ b/src/allmydata/test/test_workqueue.py @@ -153,7 +153,7 @@ class Items(unittest.TestCase): boxfile = os.path.join(wq.boxesdir, boxname) self.failUnless(os.path.exists(boxfile)) - d = wq.run_all_steps() + d = wq.flush() def _check(res): self.failUnlessEqual(len(wq.dispatched_steps), 5) self.failUnlessEqual(wq.dispatched_steps[0][0], "upload_chk") diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py index c0bed206..1982fc54 100644 --- a/src/allmydata/workqueue.py +++ b/src/allmydata/workqueue.py @@ -5,6 +5,8 @@ from twisted.internet import defer from allmydata.util import bencode from allmydata.util.idlib import b2a from allmydata.Crypto.Cipher import AES +from allmydata.filetree.nodemaker import NodeMaker +from allmydata.filetree.interfaces import INode class IWorkQueue(Interface): """Each filetable root is associated a work queue, which is persisted on @@ -22,6 +24,20 @@ class IWorkQueue(Interface): application is started, the step can be re-started without problems. The placement of the 'retain' commands depends upon how long we might expect the app to be offline. + + tempfiles: the workqueue has a special directory where temporary files + are stored. create_tempfile() generates these files, while steps like + add_upload_chk() use them. The add_delete_tempfile() will delete the + tempfile. All tempfiles are deleted when the workqueue becomes empty, + since at that point none of them can still be referenced. + + boxes: there is another special directory where named slots (called + 'boxes') hold serialized INode specifications (the strings which are + returned by INode.serialize_node()). Boxes are created by calling + create_boxname(). Boxes are filled either at the time of creation or by + steps like add_upload_chk(). Boxes are used by steps like add_addpath() + and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as + well as when the workqueue becomes empty. """ def create_tempfile(suffix=""): @@ -31,7 +47,11 @@ class IWorkQueue(Interface): path, rather it will be interpreted relative to some directory known only by the workqueue.""" def create_boxname(contents=None): - """Return a unique box name (as a string).""" + """Return a unique box name (as a string). If 'contents' are + provided, it must be an instance that provides INode, and the + serialized form of the node will be written into the box. Otherwise + the boxname can be used by steps like add_upload_chk to hold the + generated uri.""" def add_upload_chk(source_filename, stash_uri_in_boxname): """This step uploads a file to the mesh and obtains a content-based @@ -88,6 +108,14 @@ class IWorkQueue(Interface): def add_delete_box(boxname): """When executed, this step deletes the given box.""" + + # methods for use in unit tests + + def flush(): + """Execute all steps in the WorkQueue right away. Return a Deferred + that fires (with self) when the queue is empty. + """ + class NotCapableError(Exception): """You have tried to write to a read-only node.""" @@ -132,6 +160,7 @@ class WorkQueue(object): def __init__(self, basedir): assert basedir.endswith("workqueue") self.basedir = basedir + self._node_maker = NodeMaker() self.seqnum = 0 self.tmpdir = os.path.join(basedir, "tmp") #self.trashdir = os.path.join(basedir, "trash") @@ -174,8 +203,12 @@ class WorkQueue(object): f = open(os.path.join(self.filesdir, filename), "wb") return (f, filename) - def create_boxname(self): - return b2a(os.urandom(10)) + def create_boxname(self, contents=None): + boxname = b2a(os.urandom(10)) + if contents is not None: + assert INode(contents) + self.write_to_box(boxname, contents.serialize_node()) + return boxname def write_to_box(self, boxname, data): f = open(os.path.join(self.boxesdir, boxname), "w") f.write(data) @@ -309,7 +342,7 @@ class WorkQueue(object): if not hasattr(self, handlername): raise RuntimeError("unknown workqueue step type '%s'" % steptype) handler = getattr(self, handlername) - d = defer.maybeDeferred(handler, *lines[1:]) + d = defer.maybeDeferred(handler, *lines) return d def _delete_step(self, res, stepname): @@ -337,6 +370,8 @@ class WorkQueue(object): d.addCallback(self.run_all_steps) return d return defer.succeed(None) + def flush(self): + return self.run_all_steps() def open_tempfile(self, filename): @@ -353,8 +388,9 @@ class WorkQueue(object): pass def step_addpath(self, boxname, *path): + path = list(path) data = self.read_from_box(boxname) - child_node = unserialize(data) # TODO: unserialize ? + child_node = self._node_maker.make_node_from_serialized(data) return self.vdrive.add(path, child_node) def step_retain_ssk(self, index_a, read_key_a): -- 2.45.2