From 70f5c13e265ddf5dfd3a3eadb15e52d7453f4e6f Mon Sep 17 00:00:00 2001 From: Brian Warner <warner@lothar.com> Date: Sun, 24 Dec 2006 12:39:24 -0700 Subject: [PATCH] checkpointing new filetable work.. tests don't pass yet --- src/allmydata/filetable.py | 31 - src/allmydata/filetable_new.py | 728 +++++++++++++++++++++++ src/allmydata/test/test_filetable_new.py | 122 ++++ src/allmydata/workqueue.py | 231 +++++++ 4 files changed, 1081 insertions(+), 31 deletions(-) create mode 100644 src/allmydata/filetable_new.py create mode 100644 src/allmydata/test/test_filetable_new.py create mode 100644 src/allmydata/workqueue.py diff --git a/src/allmydata/filetable.py b/src/allmydata/filetable.py index 39462e45..bb96b02a 100644 --- a/src/allmydata/filetable.py +++ b/src/allmydata/filetable.py @@ -107,34 +107,3 @@ class GlobalVirtualDrive(service.MultiService): def get_root(self): return self._root - - - -# interesting feature ideas: -# pubsub for MutableDirectoryNode: get rapid notification of changes -# caused by someone else -# -# bind a local physical directory to the MutableDirectoryNode contents: -# each time the vdrive changes, update the local drive to match, and -# vice versa. - -class Node: - pass - -class MutableFileNode(Node): - """I hold an SSK identifier for a mutable file. My 'contents' are - defined to be the most recent version of the SSK's payload that can - be found. This SSK identifier must be dereferenced to get the - contents.""" - pass -class ImmutableFileNode(Node): - """I hold a CHK identifier for an immutable file. I may have some - metadata as well: ctime, mtime, content-type, and filesize.""" - pass -class ImmutableDirectoryNode(Node): - """I hold a list of child nodes.""" - pass -class MutableDirectoryNode2(Node): - """I hold an SSK identifier for a mutable directory. When - dereferenced, I will have a list of child nodes.""" - pass diff --git a/src/allmydata/filetable_new.py b/src/allmydata/filetable_new.py new file mode 100644 index 00000000..0ab130fb --- /dev/null +++ b/src/allmydata/filetable_new.py @@ -0,0 +1,728 @@ +#! /usr/bin/python + +from zope.interface import Interface, implements +from twisted.internet import defer +from allmydata.util import bencode + +# interesting feature ideas: +# pubsub for MutableDirectoryNode: get rapid notification of changes +# caused by someone else +# +# bind a local physical directory to the MutableDirectoryNode contents: +# each time the vdrive changes, update the local drive to match, and +# vice versa. + +class INode(Interface): + """This is some sort of retrievable node.""" + pass + +class IFileNode(Interface): + """This is a file which can be retrieved.""" + pass + +class IDirectoryNode(Interface): + """This is a directory which can be listed.""" + def list(): + """Return a list of names which are children of this node.""" + + +class ISubTree(Interface): + """A subtree is a collection of Nodes: files, directories, other trees. + + A subtree represents a set of connected directories and files that all + share the same access control: any given person can read or write + anything in this tree as a group, and it is not possible to give access + to some pieces of this tree and not to others. Read-only access to + individual files can be granted independently, of course, but through an + unnamed URI, not as a subdirectory. + + Each internal directory is represented by a separate Node. This might be + a DirectoryNode, or it might be a FileNode. + """ + + def get(path, opener): + """Return a Deferred that fires with the node at the given path, or + None if there is no such node. This will traverse and create subtrees + as necessary.""" + + def add(path, child, opener, work_queue): + """Add 'child' (which must implement INode) to the tree at 'path' + (which must be a list of pathname components). This will schedule all + the work necessary to cause the child to be added reliably.""" + + def find_lowest_containing_subtree_for_path(path, opener): + # not for external use. This is used internally by add(). + """Find the subtree which contains the target path, opening new + subtrees if necessary. Return a Deferred that fires with (subtree, + prepath, postpath), where prepath is the list of path components that + got to the subtree, and postpath is the list of remaining path + components (indicating a subpath within the resulting subtree). This + will traverse and even create subtrees as necessary.""" + + + def is_mutable(): + """This returns True if we have the ability to modify this subtree. + If this returns True, this reference may be adapted to + IMutableSubTree to actually exercise these mutation rights. + """ + + def get_node_for_path(path): + """Ask this subtree to follow the path through its internal nodes. If + the path terminates within this subtree, return (True, node), where + 'node' implements INode (and also IMutableNode if this subtree + is_mutable). If the path takes us beyond this subtree, return (False, + next_subtree_spec, subpath), where 'next_subtree_spec' is a string + that can be passed to an Opener to create a new subtree, and + 'subpath' is the subset of 'path' that can be passed to this new + subtree. If the path cannot be found within the subtree (and it is + not in the domain of some child subtree), return None. + """ + + def get_or_create_node_for_path(path): + """Like get_node_for_path, but instead of returning None, the subtree + will create internal nodes as necessary. Therefore it always returns + either (True, node), or (False, next_subtree_spec, prepath, postpath). + """ + +class IMutableSubTree(Interface): + def mutation_affects_parent(): + """This returns True for CHK nodes where you must inform the parent + of the new URI each time you change the child subtree. It returns + False for SSK nodes (or other nodes which have a pointer stored in + some mutable form). + """ + + def add_subpath(subpath, child_spec, work_queue): + """Ask this subtree to add the given child to an internal node at the + given subpath. The subpath must not exit the subtree through another + subtree (specifically get_subtree_for_path(subpath) must either + return None or (True,node), and in the latter case, this subtree will + create new internal nodes as necessary). + + The subtree will probably serialize itself to a file and add steps to + the work queue to accomplish its goals. + + This returns a Deferred (the value of which is ignored) when + everything has been added to the work queue. + """ + + def serialize_to_file(): + """Write a bencoded data structure to the given filehandle that can + be used to reproduce the contents of this subtree.""" + +class ISubTreeSpecification(Interface): + def serialize(): + """Return a tuple that describes this subtree. This tuple can be + passed to IOpener.open() to reconstitute the subtree.""" + +class IOpener(Interface): + def open(subtree_specification, parent_is_mutable): + """I can take an ISubTreeSpecification-providing specification of a + subtree and return a Deferred which fires with an instance that + provides ISubTree (and maybe even IMutableSubTree). I probably do + this by performing network IO: reading a file from the mesh, or from + local disk, or asking some central-service node for the current + value.""" + + +class CHKFile(object): + implements(INode, IFileNode) + def __init__(self, uri): + self.uri = uri + def get_uri(self): + return self.uri + +class MutableSSKFile(object): + implements(INode, IFileNode) + def __init__(self, read_cap, write_cap): + self.read_cap = read_cap + self.write_cap = write_cap + def get_read_capability(self): + return self.read_cap + def get_write_capability(self): + return self.write_cap + +class ImmutableSSKFile(object): + implements(INode, IFileNode) + def __init__(self, read_cap): + self.read_cap = read_cap + def get_read_capability(self): + return self.read_cap + + +class SubTreeNode: + implements(INode, IDirectoryNode) + + def __init__(self, tree): + self.enclosing_tree = tree + # node_children maps child name to another SubTreeNode instance. This + # is only for internal directory nodes. All Files and external links + # are listed in child_specifications instead. + self.node_children = {} + # child_specifications maps child name to a string which describes + # how to obtain the actual child. For example, if "foo.jpg" in this + # node represents a FILE with a uri of "fooURI", then + # self.child_specifications["foo.jpg"] = "(FILE,fooURI") + self.child_specifications = {} + + def list(self): + return sorted(self.node_children.keys() + + self.child_specifications.keys()) + + def serialize(self): + # note: this is a one-pass recursive serialization that will result + # in the whole file table being held in memory. This is only + # appropriate for directories with fewer than, say, 10k nodes. If we + # support larger directories, we should turn this into some kind of + # generator instead, and write the serialized data directly to a + # tempfile. + data = ["DIRECTORY"] + for name in sorted(self.node_children.keys()): + data.append(name) + data.append(self.node_children[name].serialize()) + for name in sorted(self.child_specifications.keys()): + data.append(name) + data.append(self.child_specifications[name].serialize()) + return data + + def unserialize(self, data): + assert data[0] == "DIRECTORY" + assert len(data) % 2 == 1 + for i in range(1, len(data), 2): + name = data[i] + child_data = data[i+1] + assert isinstance(child_data, list) + child_type = child_data[0] + if child_type == "DIRECTORY": + child = SubTreeNode(self.enclosing_tree) + child.unserialize(child_data) + self.node_children[name] = child + elif child_type == "LINK": + self.child_specifications[name] = child_data[1] + else: + raise RuntimeError("unknown serialized-node type '%s'" % + child_type) + +class _SubTreeMixin(object): + + def get(self, path, opener): + """Return a Deferred that fires with the node at the given path, or + None if there is no such node. This will traverse and even create + subtrees as necessary.""" + d = self.get_node_for_path(path) + def _done(res): + if res == None: + # traversal done, unable to find the node + return None + if res[0] == True: + # found the node + node = res[1] + assert INode.providedBy(node) + return node + # otherwise, we must open and recurse into a new subtree + next_subtree_spec = res[1] + subpath = res[2] + d1 = opener.open(next_subtree_spec, self.is_mutable()) + def _opened(next_subtree): + assert ISubTree.providedBy(next_subtree) + return next_subtree.get(subpath, opener) + d1.addCallback(_opened) + return d1 + d.addCallback(_done) + return d + + def find_lowest_containing_subtree_for_path(self, path, opener): + """Find the subtree which contains the target path, opening new + subtrees if necessary. Return a Deferred that fires with (subtree, + prepath, postpath), where prepath is the list of path components that + got to the subtree, and postpath is the list of remaining path + components (indicating a subpath within the resulting subtree). This + will traverse and even create subtrees as necessary.""" + d = self.get_or_create_node_for_path(path) + def _done(res): + if res[0] == True: + node = res[1] + # found the node in our own tree. The whole path we were + # given was used internally, and is therefore the postpath + return (self, [], path) + # otherwise, we must open and recurse into a new subtree + ignored, next_subtree_spec, prepath, postpath = res + d1 = opener.open(next_subtree_spec, self.is_mutable()) + def _opened(next_subtree): + assert ISubTree.providedBy(next_subtree) + f = next_subtree.find_lowest_containing_subtree_for_path + return f(postpath, opener) + d1.addCallback(_opened) + def _found(res2): + subtree, prepath2, postpath2 = res2 + return (subtree, prepath + prepath2, postpath2) + d1.addCallback(_found) + return d1 + d.addCallback(_done) + return d + + +class _MutableSubTreeMixin(object): + + def add(self, path, child, opener, work_queue): + d = self.find_lowest_containing_subtree_for_path(path, opener) + def _found(res): + subtree, prepath, postpath = res + assert IMutableSubTree.providedBy(subtree) + # this add_path will cause some steps to be added, as well as the + # internal node to be modified + d1 = subtree.add_subpath(postpath, child, work_queue) + if subtree.mutation_affects_parent(): + def _added(boxname): + work_queue.add_addpath(boxname, prepath) + d1.addCallback(_added) + return d1 + d.addCallback(_found) + return d + + + +class _DirectorySubTree(_SubTreeMixin): + """I represent a set of connected directories that all share the same + access control: any given person can read or write anything in this tree + as a group, and it is not possible to give access to some pieces of this + tree and not to others. Read-only access to individual files can be + granted independently, of course, but through an unnamed URI, not as a + subdirectory. + + Each internal directory is represented by a separate Node. + + This is an abstract base class. Individual subclasses will implement + various forms of serialization, persistence, and mutability. + + """ + implements(ISubTree) + + def new(self): + self.root = SubTreeNode(self) + + def unserialize(self, serialized_data): + """Populate all nodes from serialized_data, previously created by + calling my serialize() method. 'serialized_data' is a series of + nested lists (s-expressions), probably recorded in bencoded form.""" + self.root = SubTreeNode(self) + self.root.unserialize(serialized_data) + return self + + def serialize(self): + """Return a series of nested lists which describe my structure + in a form that can be bencoded.""" + return self.root.serialize() + + def is_mutable(self): + return IMutableSubTree.providedBy(self) + + def get_node_for_path(self, path): + # this is restricted to traversing our own subtree. + subpath = path + node = self.root + while subpath: + name = subpath.pop(0) + if name in node.node_children: + node = node.node_children[name] + assert isinstance(node, SubTreeNode) + continue + if name in node.child_specifications: + # the path takes us out of this SubTree and into another + next_subtree_spec = node.child_specifications[name] + result = (False, next_subtree_spec, subpath) + return defer.succeed(result) + return defer.succeed(None) + # we've run out of path components, so we must be at the terminus + result = (True, node) + return defer.succeed(result) + + def get_or_create_node_for_path(self, path): + # this is restricted to traversing our own subtree, but will create + # internal directory nodes as necessary + prepath = [] + postpath = path[:] + node = self.root + while postpath: + name = postpath.pop(0) + prepath.append(name) + if name in node.node_children: + node = node.node_children[name] + assert isinstance(node, SubTreeNode) + continue + if name in node.child_specifications: + # the path takes us out of this SubTree and into another + next_subtree_spec = node.child_specifications[name] + result = (False, next_subtree_spec, prepath, postpath) + return defer.succeed(result) + # need to create a new node + new_node = SubTreeNode(self) + node.node_children[name] = new_node + node = new_node + continue + # we've run out of path components, so we must be at the terminus + result = (True, node) + return defer.succeed(result) + +class ImmutableDirectorySubTree(_DirectorySubTree): + pass + +class _MutableDirectorySubTree(_DirectorySubTree, _MutableSubTreeMixin): + implements(IMutableSubTree) + + def add_subpath(self, subpath, child, work_queue): + prepath = subpath[:-1] + name = subpath[-1] + d = self.get_node_for_path(prepath) + def _found(results): + assert results is not None + assert results[0] == True + node = results[1] + # modify the in-RAM copy + node.child_specifications[name] = child + # now serialize and upload ourselves + boxname = self.upload_my_serialized_form(work_queue) + # our caller will perform the addpath, if necessary + return boxname + d.addCallback(_found) + return d + + def serialize_to_file(self, f): + f.write(bencode.bencode(self.serialize())) + +class MutableCHKDirectorySubTree(_MutableDirectorySubTree): + + def mutation_affects_parent(self): + return True + + def set_uri(self, uri): + self.old_uri = uri + + def upload_my_serialized_form(self, work_queue): + # this is the CHK form + f, filename = work_queue.create_tempfile() + self.serialize_to_file(f) + f.close() + boxname = work_queue.create_boxname() + work_queue.add_upload_chk(filename, boxname) + work_queue.add_delete_tempfile(filename) + work_queue.add_retain_uri_from_box(boxname) + work_queue.add_delete_box(boxname) + work_queue.add_unlink_uri(self.old_uri) + # TODO: think about how self.old_uri will get updated. I *think* that + # this whole instance will get replaced, so it ought to be ok. But + # this needs investigation. + return boxname + +class MutableSSKDirectorySubTree(_MutableDirectorySubTree): + + def new(self): + _MutableDirectorySubTree.new(self) + self.version = 0 + + def mutation_affects_parent(self): + return False + + def set_version(self, version): + self.version = version + + def upload_my_serialized_form(self, work_queue): + # this is the SSK form + f, filename = work_queue.create_tempfile() + self.serialize_to_file(f) + f.close() + work_queue.add_upload_ssk(filename, self.get_write_capability(), + self.version) + self.version = self.version + 1 + work_queue.add_delete_tempfile(filename) + work_queue.add_retain_ssk(self.get_read_capability()) + + + +class CHKFileSpecification(object): + implements(ISubTreeSpecification) + stype = "CHK-File" + def set_uri(self, uri): + self.uri = uri + def serialize(self): + return (self.stype, self.uri) + def unserialize(self, data): + assert data[0] == self.stype + self.uri = data[1] + +class ImmutableSSKFileSpecification(object): + implements(ISubTreeSpecification) + stype = "SSK-Readonly-File" + def set_read_capability(self, read_cap): + self.read_cap = read_cap + def get_read_capability(self): + return self.read_cap + def serialize(self): + return (self.stype, self.read_cap) + def unserialize(self, data): + assert data[0] == self.stype + self.read_cap = data[1] + +class MutableSSKFileSpecification(ImmutableSSKFileSpecification): + implements(ISubTreeSpecification) + stype = "SSK-ReadWrite-File" + def set_write_capability(self, write_cap): + self.write_cap = write_cap + def get_write_capability(self): + return self.write_cap + def serialize(self): + return (self.stype, self.read_cap, self.write_cap) + def unserialize(self, data): + assert data[0] == self.stype + self.read_cap = data[1] + self.write_cap = data[2] + +class CHKDirectorySpecification(object): + implements(ISubTreeSpecification) + stype = "CHK-Directory" + def set_uri(self, uri): + self.uri = uri + def serialize(self): + return (self.stype, self.uri) + def unserialize(self, data): + assert data[0] == self.stype + self.uri = data[1] + +class ImmutableSSKDirectorySpecification(object): + implements(ISubTreeSpecification) + stype = "SSK-Readonly-Directory" + def set_read_capability(self, read_cap): + self.read_cap = read_cap + def get_read_capability(self): + return self.read_cap + def serialize(self): + return (self.stype, self.read_cap) + def unserialize(self, data): + assert data[0] == self.stype + self.read_cap = data[1] + +class MutableSSKDirectorySpecification(ImmutableSSKDirectorySpecification): + implements(ISubTreeSpecification) + stype = "SSK-ReadWrite-Directory" + def set_write_capability(self, write_cap): + self.write_cap = write_cap + def get_write_capability(self): + return self.write_cap + def serialize(self): + return (self.stype, self.read_cap, self.write_cap) + def unserialize(self, data): + assert data[0] == self.stype + self.read_cap = data[1] + self.write_cap = data[2] + +class LocalFileRedirection(object): + implements(ISubTreeSpecification) + stype = "LocalFile" + def set_filename(self, filename): + self.filename = filename + def get_filename(self): + return self.filename + def serialize(self): + return (self.stype, self.filename) + +class QueenRedirection(object): + implements(ISubTreeSpecification) + stype = "QueenRedirection" + def set_handle(self, handle): + self.handle = handle + def get_handle(self): + return self.handle + def serialize(self): + return (self.stype, self.handle) + +class HTTPRedirection(object): + implements(ISubTreeSpecification) + stype = "HTTPRedirection" + def set_url(self, url): + self.url = url + def get_url(self): + return self.url + def serialize(self): + return (self.stype, self.url) + +class QueenOrLocalFileRedirection(object): + implements(ISubTreeSpecification) + stype = "QueenOrLocalFile" + def set_filename(self, filename): + self.filename = filename + def get_filename(self): + return self.filename + def set_handle(self, handle): + self.handle = handle + def get_handle(self): + return self.handle + def serialize(self): + return (self.stype, self.handle, self.filename) + +def unserialize_subtree_specification(serialized_spec): + assert isinstance(serialized_spec, tuple) + for stype in [CHKDirectorySpecification, + ImmutableSSKDirectorySpecification, + MutableSSKDirectorySpecification, + LocalFileRedirection, + QueenRedirection, + HTTPRedirection, + QueenOrLocalFileRedirection, + ]: + if tuple[0] == stype: + spec = stype() + spec.unserialize(serialized_spec) + return spec + raise RuntimeError("unable to unserialize subtree specification '%s'" % + (serialized_spec,)) + + + + +class Opener(object): + implements(IOpener) + def __init__(self, queen): + self._queen = queen + self._cache = {} + + def open(self, subtree_specification, parent_is_mutable): + spec = ISubTreeSpecification(subtree_specification) + + # is it in cache? + if spec in self._cache: + return defer.succeed(self._cache[spec]) + + # is it a file? + if isinstance(spec, CHKFileSpecification): + return self._get_chk_file(spec) + if isinstance(spec, (MutableSSKFileSpecification, + ImmutableSSKFileSpecification)): + return self._get_ssk_file(spec) + + # is it a directory? + if isinstance(spec, CHKDirectorySpecification): + return self._get_chk_dir(spec, parent_is_mutable) + if isinstance(spec, (ImmutableSSKDirectorySpecification, + MutableSSKDirectorySpecification)): + return self._get_ssk_dir(spec) + + # is it a redirection to a file or directory? + if isinstance(spec, LocalFileRedirection): + return self._get_local_redir(spec) + if isinstance(spec, QueenRedirection): + return self._get_queen_redir(spec) + if isinstance(spec, HTTPRedirection): + return self._get_http_redir(spec) + if isinstance(spec, QueenOrLocalFileRedirection): + return self._get_queen_or_local_redir(spec) + + # none of the above + raise RuntimeError("I do not know how to open '%s'" % (spec,)) + + def _add_to_cache(self, subtree, spec): + self._cache[spec] = subtree + # TODO: remove things from the cache eventually + return subtree + + def _get_chk_file(self, spec): + subtree = CHKFile(spec.get_uri()) + return defer.succeed(subtree) + + def _get_ssk_file(self, spec): + if isinstance(spec, MutableSSKFileSpecification): + subtree = MutableSSKFile(spec.get_read_capability(), + spec.get_write_capability()) + else: + assert isinstance(spec, ImmutableSSKFileSpecification) + subtree = ImmutableSSKFile(spec.get_read_cap()) + return defer.succeed(subtree) + + def _get_chk_dir(self, spec, parent_is_mutable): + uri = spec.get_uri() + if parent_is_mutable: + subtree = MutableCHKDirectorySubTree() + subtree.set_uri(uri) + else: + subtree = ImmutableDirectorySubTree() + d = self.downloader.get_chk(uri) + d.addCallback(subtree.unserialize) + d.addCallback(self._add_to_cache, spec) + return d + + def _get_ssk_dir(self, spec): + mutable = isinstance(spec, ImmutableSSKDirectorySpecification) + if mutable: + subtree = ImmutableDirectorySubTree() + else: + assert isinstance(spec, MutableSSKDirectorySpecification) + subtree = MutableSSKDirectorySubTree() + subtree.set_write_capability(spec.get_write_capability()) + read_cap = spec.get_read_capability() + subtree.set_read_capability(read_cap) + d = self.downloader.get_ssk_latest(read_cap) + def _set_version(res): + version, data = res + if mutable: + subtree.set_version(version) + return data + d.addCallback(_set_version) + d.addCallback(subtree.unserialize) + d.addCallback(self._add_to_cache, spec) + return d + + def _get_local_redir(self, spec): + # there is a local file which contains a bencoded serialized + # subtree specification. + filename = spec.get_filename() + # TODO: will this enable outsiders to cause us to read from + # arbitrary files? Think about this. + f = open(filename, "rb") + data = bencode.bdecode(f.read()) + f.close() + # note: we don't cache the contents of the file. TODO: consider + # doing this based upon mtime. It is important that we be able to + # notice if the file has been changed. + new_spec = unserialize_subtree_specification(data) + return self.open(new_spec, True) + + def _get_queen_redir(self, spec): + # this specifies a handle for which the Queen maintains a + # serialized subtree specification. + handle = spec.get_handle() + d = self._queen.callRemote("lookup_handle", handle) + d.addCallback(unserialize_subtree_specification) + d.addCallback(self.open, True) + return d + + def _get_http_redir(self, spec): + # this specifies a URL at which there is a bencoded serialized + # subtree specification. + url = spec.get_url() + from twisted.web import client + d = client.getPage(url) + d.addCallback(bencode.bdecode) + d.addCallback(unserialize_subtree_specification) + d.addCallback(self.open, False) + return d + + def _get_queen_or_local_redir(self, spec): + # there is a local file which contains a bencoded serialized + # subtree specification. The queen also has a copy. Whomever has + # the higher version number wins. + filename = spec.get_filename() + f = open(filename, "rb") + local_version, local_data = bencode.bdecode(f.read()) + f.close() + handle = spec.get_handle() + # TODO: pubsub so we can cache the queen's results + d = self._queen.callRemote("lookup_handle", handle) + def _got_queen(response): + queen_version, queen_data = response + if queen_version > local_version: + return queen_data + return local_data + d.addCallback(_got_queen) + d.addCallback(unserialize_subtree_specification) + d.addCallback(self.open, True) + return d + diff --git a/src/allmydata/test/test_filetable_new.py b/src/allmydata/test/test_filetable_new.py new file mode 100644 index 00000000..b9dd230d --- /dev/null +++ b/src/allmydata/test/test_filetable_new.py @@ -0,0 +1,122 @@ + +import os +from zope.interface import implements +from twisted.trial import unittest +from allmydata import filetable_new as ft +from allmydata import workqueue +from cStringIO import StringIO + +class FakeOpener(object): + implements(ft.IOpener) + +class FakeWorkQueue(object): + implements(workqueue.IWorkQueue) + def create_tempfile(self): + return (StringIO(), "dummy_filename") + def create_boxname(self): + return "dummy_boxname" + def add_upload_chk(self, source_filename, stash_uri_in_boxname): + pass + def add_upload_ssk(self, source_filename, write_capability, + previous_version): + pass + def add_retain_ssk(self, read_capability): + pass + def add_unlink_ssk(self, write_capability): + pass + def add_retain_uri_from_box(self, boxname): + pass + def add_addpath(self, boxname, path): + pass + def add_unlink_uri(self, uri): + pass + def add_delete_tempfile(self, filename): + pass + def add_delete_box(self, boxname): + pass + + +class OneSubTree(unittest.TestCase): + def test_create_empty_immutable(self): + st = ft.ImmutableDirectorySubTree() + st.new() + self.failIf(st.is_mutable()) + d = st.get([], FakeOpener()) + def _got_root(root): + self.failUnless(ft.IDirectoryNode.providedBy(root)) + self.failUnlessEqual(root.list(), []) + d.addCallback(_got_root) + return d + + def test_immutable_1(self): + st = ft.ImmutableDirectorySubTree() + st.new() + # now populate it (by modifying the internal data structures) with + # some internal directories + one = ft.SubTreeNode(st) + two = ft.SubTreeNode(st) + three = ft.SubTreeNode(st) + st.root.node_children["one"] = one + st.root.node_children["two"] = two + two.node_children["three"] = three + + # now examine it + self.failIf(st.is_mutable()) + o = FakeOpener() + d = st.get([], o) + def _got_root(root): + self.failUnless(ft.IDirectoryNode.providedBy(root)) + self.failUnlessEqual(root.list(), ["one", "two"]) + d.addCallback(_got_root) + d.addCallback(lambda res: st.get(["one"], o)) + def _got_one(_one): + self.failUnlessIdentical(one, _one) + self.failUnless(ft.IDirectoryNode.providedBy(_one)) + self.failUnlessEqual(_one.list(), []) + d.addCallback(_got_one) + d.addCallback(lambda res: st.get(["two"], o)) + def _got_two(_two): + self.failUnlessIdentical(two, _two) + self.failUnless(ft.IDirectoryNode.providedBy(_two)) + self.failUnlessEqual(_two.list(), ["three"]) + d.addCallback(_got_two) + d.addCallback(lambda res: st.get(["two", "three"], o)) + def _got_three(_three): + self.failUnlessIdentical(three, _three) + self.failUnless(ft.IDirectoryNode.providedBy(_three)) + self.failUnlessEqual(_three.list(), []) + d.addCallback(_got_three) + d.addCallback(lambda res: st.get(["missing"], o)) + d.addCallback(self.failUnlessEqual, None) + return d + + def test_mutable_1(self): + o = FakeOpener() + wq = FakeWorkQueue() + st = ft.MutableCHKDirectorySubTree() + st.new() + st.set_uri(None) + self.failUnless(st.is_mutable()) + d = st.get([], o) + def _got_root(root): + self.failUnless(ft.IDirectoryNode.providedBy(root)) + self.failUnlessEqual(root.list(), []) + d.addCallback(_got_root) + file_three = ft.CHKFileSpecification() + file_three.set_uri("file_three_uri") + d.addCallback(lambda res: st.add(["one", "two", "three"], file_three, + o, wq)) + d.addCallback(lambda res: st.get(["one"], o)) + def _got_one(one): + self.failUnless(ft.IDirectoryNode.providedBy(one)) + self.failUnlessEqual(one.list(), ["two"]) + d.addCallback(_got_one) + d.addCallback(lambda res: st.get(["one", "two"], o)) + def _got_two(two): + self.failUnless(ft.IDirectoryNode.providedBy(two)) + self.failUnlessEqual(two.list(), ["three"]) + self.failUnlessIdentical(two.child_specifications["three"], + file_three) + d.addCallback(_got_two) + return d + diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py new file mode 100644 index 00000000..81d9d490 --- /dev/null +++ b/src/allmydata/workqueue.py @@ -0,0 +1,231 @@ + +import os, shutil +from zope.interface import Interface, implements +from allmydata.util import bencode + +class IWorkQueue(Interface): + """Each filetable root is associated a work queue, which is persisted on + disk and contains idempotent actions that need to be performed. After + each action is completed, it is removed from the queue. + + The queue is broken up into several sections. First are the 'upload' + steps. After this are the 'add_subpath' commands. The last section has + the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe + interspersed with 'upload', maybe after 'add_subpath' and before + 'unlink'. + + The general idea is that the processing of the work queue could be + interrupted at any time, in the middle of a step, and the next time the + application is started, the step can be re-started without problems. The + placement of the 'retain' commands depends upon how long we might expect + the app to be offline. + """ + + def create_tempfile(): + """Return (f, filename).""" + def create_boxname(): + """Return a unique box name (as a string).""" + + def add_upload_chk(source_filename, stash_uri_in_boxname): + """This step uploads a file to the mesh and obtains a content-based + URI which can be used to later retrieve the same contents ('CHK' + mode). This URI includes unlink rights. It does not mark the file for + retention. + + When the upload is complete, the resulting URI is stashed in a 'box' + with the specified name. This is basically a local variable. A later + 'add_subpath' step will reference this boxname and retrieve the URI. + """ + + def add_upload_ssk(source_filename, write_capability, previous_version): + """This step uploads a file to the mesh in a way that replaces the + previous version and does not require a change to the ID referenced + by the parent. + """ + + def add_retain_ssk(read_capability): + """Arrange for the given SSK to be kept alive.""" + + def add_unlink_ssk(write_capability): + """Stop keeping the given SSK alive.""" + + def add_retain_uri_from_box(boxname): + """When executed, this step retrieves the URI from the given box and + marks it for retention: this adds it to a list of all URIs that this + system cares about, which will initiate filechecking/repair for the + file.""" + + def add_addpath(boxname, path): + """When executed, this step will retrieve the URI from the given box + and call root.add(path, URIishthingyTODO, etc). + """ + + def add_unlink_uri(uri): + """When executed, this step will unlink the data referenced by the + given URI: the unlink rights are used to tell any shareholders to + unlink the file (possibly deleting it), and the URI is removed from + the list that this system cares about, cancelling filechecking/repair + for the file. + + All 'unlink' steps are pushed to the end of the queue. + """ + + def add_delete_tempfile(filename): + """This step will delete a tempfile created by create_tempfile.""" + + def add_delete_box(boxname): + """When executed, this step deletes the given box.""" + + +class Step(object): + def setup(self, stepname, basedir): + self.basedir = basedir + self.stepname = stepname + self.stepbase = os.path.join(self.basedir, self.stepname) + + def remove(self, _ignored=None): + trashdir = os.path.join(self.basedir, "trash", self.stepname) + os.rename(self.stepbase, trashdir) + shutil.rmtree(trashdir) + +class UploadSSKStep(Step): + def start(self): + f = open(os.path.join(self.stepbase, "source_filename"), "r") + source_filename = f.read() + f.close() + f = open(os.path.join(self.stepbase, "write_capability"), "r") + write_cap = bencode.bdecode(f.read()) + f.close() + f = open(os.path.join(self.stepbase, "previous_version"), "r") + previous_version = bencode.bdecode(f.read()) + f.close() + + n = SSKNode() + n.set_version(previous_version) + n.set_write_capability(write_cap) + f = open(source_filename, "rb") + data = f.read() + f.close() + published_data = n.write_new_version(data) + d = self.push_ssk(n.ssk_index, n.vresion, published_data) + d.addCallback(self.remove) + return d + + +class WorkQueue(object): + implements(IWorkQueue) + def __init__(self, basedir): + self.basedir = basedir + # methods to add entries to the queue + + # methods to perform work + + def get_next_step(self): + stepname = self._find_first_step() + stepbase = os.path.join(self.basedir, stepname) + f = open(os.path.join(stepbase, "type"), "r") + stype = f.read().strip() + f.close() + if stype == "upload_ssk": + s = UploadSSKStep() + # ... + else: + raise RuntimeError("unknown step type '%s'" % stype) + s.setup(stepname, self.basedir) + d = s.start() + return d + + + + +AES_KEY_LENGTH = 16 +def make_aes_key(): + return os.urandom(16) +def make_rsa_key(): + raise NotImplementedError + +class MutableSSKTracker(object): + """I represent a mutable file, indexed by an SSK. + """ + + def create(self): + # if you create the node this way, you will have both read and write + # capabilities + self.priv_key, self.pub_key = make_rsa_key() + self.ssk_index = sha(self.pub_key.serialized()) + self.write_key = make_aes_key() + self.read_key = sha(self.write_key)[:AES_KEY_LENGTH] + self.version = 0 + + def set_version(self, version): + self.version = version + + def set_read_capability(self, read_cap): + (self.ssk_index, self.read_key) = read_cap + + def set_write_capability(self, write_cap): + # TODO: add some assertions here, if someone calls both + # set_read_capability and set_write_capability, make sure the keys + # match + (self.ssk_index, self.write_key) = write_cap + self.read_key = sha(self.write_key)[:AES_KEY_LENGTH] + + def extract_readwrite_from_published(self, published_data, write_key): + self.write_key = write_key + self.read_key = sha(self.write_key)[:AES_KEY_LENGTH] + self._extract(published_data) + self.priv_key = aes_decrypt(write_key, self.encrypted_privkey) + assert self.priv_key.is_this_your_pub_key(self.pub_key) + + def extract_readonly_from_published(self, published_data, read_key): + self.write_key = None + self.read_key = read_key + self._extract(published_data) + self.priv_key = None + + def _extract(self, published_data): + (signed_data, serialized_pub_key, sig) = unserialize(published_data) + self.pub_key = unserialize(serialized_pub_key) + self.pub_key.check_signature(sig, signed_data) + (encrypted_privkey, encrypted_data, version) = unserialize(signed_data) + self.data = aes_decrypt(self.read_key, encrypted_data) + self.encrypted_privkey = encrypted_privkey + + def get_read_capability(self): + return (self.ssk_index, self.read_key) + + def get_write_capability(self): + if not self.write_key: + raise NotCapableError("This SSKNode is read-only") + return (self.ssk_index, self.write_key) + + def write_new_version(self, data): + if not self.write_key: + raise NotCapableError("This SSKNode is read-only") + encrypted_privkey = aes_encrypt(self.write_key, + self.priv_key.serialized()) + encrypted_data = aes_encrypt(self.read_key, data) + self.version += 1 + signed_data = serialize((encrypted_privkey, + encrypted_data, + self.version)) + sig = self.priv_key.sign(signed_data) + serialized_pub_key = self.pub_key.serialized() + published_data = serialize((signed_data, serialized_pub_key, sig)) + return published_data + +def make_new_SSK_node(): + n = SSKNode() + n.create() + return n + +def extract_readwrite_SSK_node(published_data, write_key): + n = SSKNode() + n.extract_readwrite_SSK_node(published_data, write_key) + return n + +def extract_readonly_SSK_node(published_data, read_key): + n = SSKNode() + n.extract_readonly_from_published(published_data, read_key) + return n + -- 2.45.2