From: Brian Warner Date: Fri, 19 Jan 2007 09:23:03 +0000 (-0700) Subject: snapshot filetree work: it's getting close X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~351 X-Git-Url: https://git.rkrishnan.org/components/provisioning?a=commitdiff_plain;h=ff6b09d973cbb5ab8b927bf4cf5726864b32e5e0;p=tahoe-lafs%2Ftahoe-lafs.git snapshot filetree work: it's getting close --- diff --git a/src/allmydata/filetree/directory.py b/src/allmydata/filetree/directory.py index a52c9ed9..6e469e78 100644 --- a/src/allmydata/filetree/directory.py +++ b/src/allmydata/filetree/directory.py @@ -21,27 +21,67 @@ class SubTreeNode: def __init__(self, tree): self.enclosing_tree = tree - # node_children maps child name to another SubTreeNode instance. This - # is only for internal directory nodes. All Files and external links - # are listed in child_specifications instead. - self.node_children = {} - # child_specifications maps child name to a string which describes - # how to obtain the actual child. For example, if "foo.jpg" in this - # node represents a FILE with a uri of "fooURI", then - # self.child_specifications["foo.jpg"] = "(FILE,fooURI") + # subdirectory_node_children maps child name to another SubTreeNode + # instance. This is only for internal directory nodes. All other + # nodes are listed in child_specifications instead. + self.subdirectory_node_children = {} + # child_specifications maps child name to a specification tuple which + # describes how to obtain the actual child. For example, if "foo.jpg" + # in this node represents a CHK-encoded FILE with a uri of "fooURI", + # then self.child_specifications["foo.jpg"] = ("CHKFILE","fooURI") self.child_specifications = {} + def is_directory(self): + return True + def list(self): - return sorted(self.node_children.keys() + + return sorted(self.subdirectory_node_children.keys() + self.child_specifications.keys()) - def serialize(self): + def get(self, childname): + if childname in self.subdirectory_node_children: + return self.subdirectory_node_children[childname] + elif childname in self.child_specifications: + return to_node(self.child_specifications[childname]) + else: + raise NoSuchChildError("no child named '%s'" % (childname,)) + + def get_subtree(self): + return self.enclosing_tree + + def delete(self, childname): + assert self.enclosing_tree.is_mutable() + if childname in self.subdirectory_node_children: + del self.subdirectory_node_children[childname] + elif childname in self.child_specifications: + del to_node(self.child_specifications[childname]) + else: + raise NoSuchChildError("no child named '%s'" % (childname,)) + + def add_subdir(self, childname): + assert childname not in self.subdirectory_node_children + assert childname not in self.child_specifications + newnode = SubTreeNode(self.enclosing_tree) + self.subdirectory_node_children[childname] = newnode + return newnode + + def add(self, childname, node): + assert childname not in self.subdirectory_node_children + assert childname not in self.child_specifications + spec = to_spec(node) + self.child_specifications[childname] = spec + return self + + def serialize_to_sexprs(self): # note: this is a one-pass recursive serialization that will result # in the whole file table being held in memory. This is only # appropriate for directories with fewer than, say, 10k nodes. If we # support larger directories, we should turn this into some kind of # generator instead, and write the serialized data directly to a # tempfile. + # + # ["DIRECTORY", name1, child1, name2, child2..] + data = ["DIRECTORY"] for name in sorted(self.node_children.keys()): data.append(name) @@ -51,7 +91,7 @@ class SubTreeNode: data.append(self.child_specifications[name].serialize()) return data - def unserialize(self, data): + def populate_from_sexprs(self, data): assert data[0] == "DIRECTORY" assert len(data) % 2 == 1 for i in range(1, len(data), 2): @@ -61,97 +101,14 @@ class SubTreeNode: child_type = child_data[0] if child_type == "DIRECTORY": child = SubTreeNode(self.enclosing_tree) - child.unserialize(child_data) + child.populate_from_sexprs(child_data) self.node_children[name] = child else: self.child_specifications[name] = child_data -class _SubTreeMixin(object): - - def get(self, path, opener): - """Return a Deferred that fires with the node at the given path, or - None if there is no such node. This will traverse and even create - subtrees as necessary.""" - d = self.get_node_for_path(path) - def _done(res): - if res == None: - # traversal done, unable to find the node - return None - if res[0] == True: - # found the node - node = res[1] - assert INode.providedBy(node) - return node - # otherwise, we must open and recurse into a new subtree - next_subtree_spec = res[1] - subpath = res[2] - d1 = opener.open(next_subtree_spec, self.is_mutable()) - def _opened(next_subtree): - assert ISubTree.providedBy(next_subtree) - return next_subtree.get(subpath, opener) - d1.addCallback(_opened) - return d1 - d.addCallback(_done) - return d - - def find_lowest_containing_subtree_for_path(self, path, opener): - """Find the subtree which contains the target path, opening new - subtrees if necessary. Return a Deferred that fires with (subtree, - prepath, postpath), where prepath is the list of path components that - got to the subtree, and postpath is the list of remaining path - components (indicating a subpath within the resulting subtree). This - will traverse and even create subtrees as necessary.""" - d = self.get_or_create_node_for_path(path) - def _done(res): - if res[0] == True: - node = res[1] - # found the node in our own tree. The whole path we were - # given was used internally, and is therefore the postpath - return (self, [], path) - # otherwise, we must open and recurse into a new subtree - ignored, next_subtree_spec, prepath, postpath = res - d1 = opener.open(next_subtree_spec, self.is_mutable()) - def _opened(next_subtree): - assert ISubTree.providedBy(next_subtree) - f = next_subtree.find_lowest_containing_subtree_for_path - return f(postpath, opener) - d1.addCallback(_opened) - def _found(res2): - subtree, prepath2, postpath2 = res2 - return (subtree, prepath + prepath2, postpath2) - d1.addCallback(_found) - return d1 - d.addCallback(_done) - return d - - -class _MutableSubTreeMixin(object): - - def add(self, path, child, opener, work_queue): - assert len(path) > 0 - d = self.find_lowest_containing_subtree_for_path(path[:-1], opener) - def _found(res): - subtree, prepath, postpath = res - assert IMutableSubTree.providedBy(subtree) - # postpath is from the top of the subtree to the directory where - # this child should be added. add_subpath wants the path from the - # top of the subtree to the child itself, so we need to append - # the child's name here. - addpath = postpath + [path[-1]] - # this add_path will cause some steps to be added, as well as the - # internal node to be modified - d1 = subtree.add_subpath(addpath, child, work_queue) - if subtree.mutation_affects_parent(): - def _added(boxname): - work_queue.add_addpath(boxname, prepath) - d1.addCallback(_added) - return d1 - d.addCallback(_found) - return d - -class _DirectorySubTree(_SubTreeMixin): +class _DirectorySubTree(object): """I represent a set of connected directories that all share the same access control: any given person can read or write anything in this tree as a group, and it is not possible to give access to some pieces of this @@ -167,99 +124,60 @@ class _DirectorySubTree(_SubTreeMixin): """ implements(ISubTree) + def new(self): self.root = SubTreeNode(self) + self.mutable = True # sure, why not - def unserialize(self, serialized_data): - """Populate all nodes from serialized_data, previously created by - calling my serialize() method. 'serialized_data' is a series of - nested lists (s-expressions), probably recorded in bencoded form.""" - self.root = SubTreeNode(self) - self.root.unserialize(serialized_data) + def populate_from_specification(self, spec, parent_is_mutable, downloader): + return self.populate_from_node(to_node(spec), + parent_is_mutable, downloader) + + def populate_from_data(self, data): + self.root = SubTreeNode() + self.root.populate_from_sexprs(bencode.bdecode(data)) return self def serialize(self): """Return a series of nested lists which describe my structure in a form that can be bencoded.""" - return self.root.serialize() + return self.root.serialize_to_sexprs() + + def serialize_to_file(self, f): + f.write(bencode.bencode(self.serialize())) def is_mutable(self): - return IMutableSubTree.providedBy(self) + return self.mutable def get_node_for_path(self, path): - # this is restricted to traversing our own subtree. - subpath = path + # this is restricted to traversing our own subtree. Returns + # (found_path, node, remaining_path) + found_path = [] + remaining_path = path[:] node = self.root - while subpath: - name = subpath.pop(0) + while remaining_path: + name = remaining_path[0] if name in node.node_children: node = node.node_children[name] assert isinstance(node, SubTreeNode) + found_path.append(name) + remaining_path.pop(0) continue if name in node.child_specifications: - # the path takes us out of this SubTree and into another + # the path takes us out of this subtree and into another next_subtree_spec = node.child_specifications[name] - result = (False, next_subtree_spec, subpath) - return defer.succeed(result) - return defer.succeed(None) - # we've run out of path components, so we must be at the terminus - result = (True, node) - return defer.succeed(result) - - def get_or_create_node_for_path(self, path): - # this is restricted to traversing our own subtree, but will create - # internal directory nodes as necessary - prepath = [] - postpath = path[:] - node = self.root - while postpath: - name = postpath.pop(0) - prepath.append(name) - if name in node.node_children: - node = node.node_children[name] - assert isinstance(node, SubTreeNode) - continue - if name in node.child_specifications: - # the path takes us out of this SubTree and into another - next_subtree_spec = node.child_specifications[name] - result = (False, next_subtree_spec, prepath, postpath) - return defer.succeed(result) - # need to create a new node - new_node = SubTreeNode(self) - node.node_children[name] = new_node - node = new_node - continue - # we've run out of path components, so we must be at the terminus - result = (True, node) - return defer.succeed(result) - -class ImmutableDirectorySubTree(_DirectorySubTree): - pass - -class _MutableDirectorySubTree(_DirectorySubTree, _MutableSubTreeMixin): - implements(IMutableSubTree) - - def add_subpath(self, subpath, child, work_queue): - prepath = subpath[:-1] - name = subpath[-1] - d = self.get_node_for_path(prepath) - def _found(results): - assert results is not None - assert results[0] == True - node = results[1] - # modify the in-RAM copy - node.child_specifications[name] = child - # now serialize and upload ourselves - boxname = self.upload_my_serialized_form(work_queue) - # our caller will perform the addpath, if necessary - return boxname - d.addCallback(_found) - return d - - def serialize_to_file(self, f): - f.write(bencode.bencode(self.serialize())) - -class MutableCHKDirectorySubTree(_MutableDirectorySubTree): + node = to_node(next_subtree_spec) + found_path.append(name) + remaining_path.pop(0) + break + # The node *would* be in this subtree if it existed, but it + # doesn't. Leave found_path and remaining_path alone, and node + # points at the last parent node that was on the path. + break + return (found_path, node, remaining_path) + +class CHKDirectorySubTree(_DirectorySubTree): + # maybe mutable, maybe not def mutation_affects_parent(self): return True @@ -267,7 +185,14 @@ class MutableCHKDirectorySubTree(_MutableDirectorySubTree): def set_uri(self, uri): self.old_uri = uri - def upload_my_serialized_form(self, work_queue): + def populate_from_node(self, node, parent_is_mutable, downloader): + node = ICHKDirectoryNode(node) + self.mutable = parent_is_mutable + d = downloader.download(node.get_uri(), download.Data()) + d.addCallback(self.populate_from_data) + return d + + def update(self, prepath, work_queue): # this is the CHK form f, filename = work_queue.create_tempfile(".chkdir") self.serialize_to_file(f) @@ -277,21 +202,32 @@ class MutableCHKDirectorySubTree(_MutableDirectorySubTree): work_queue.add_delete_tempfile(filename) work_queue.add_retain_uri_from_box(boxname) work_queue.add_delete_box(boxname) + work_queue.add_addpath(boxname, prepath) work_queue.add_unlink_uri(self.old_uri) # TODO: think about how self.old_uri will get updated. I *think* that # this whole instance will get replaced, so it ought to be ok. But # this needs investigation. return boxname -class MutableSSKDirectorySubTree(_MutableDirectorySubTree): +class SSKDirectorySubTree(_DirectorySubTree): def new(self): - _MutableDirectorySubTree.new(self) + _DirectorySubTree.new(self) self.version = 0 + # TODO: populate def mutation_affects_parent(self): return False + def populate_from_node(self, node, parent_is_mutable, downloader): + node = ISSKDirectoryNode(node) + self.read_capability = node.get_read_capability() + self.write_capability = node.get_write_capability() + self.mutable = bool(self.write_capability) + d = downloader.download_ssk(self.read_capability, download.Data()) + d.addCallback(self.populate_from_data) + return d + def set_version(self, version): self.version = version @@ -300,9 +236,9 @@ class MutableSSKDirectorySubTree(_MutableDirectorySubTree): f, filename = work_queue.create_tempfile(".sskdir") self.serialize_to_file(f) f.close() - work_queue.add_upload_ssk(filename, self.get_write_capability(), + work_queue.add_upload_ssk(filename, self.write_capability, self.version) self.version = self.version + 1 work_queue.add_delete_tempfile(filename) - work_queue.add_retain_ssk(self.get_read_capability()) + work_queue.add_retain_ssk(self.read_capability) diff --git a/src/allmydata/filetree/interfaces.py b/src/allmydata/filetree/interfaces.py index 8934a7f3..2051916c 100644 --- a/src/allmydata/filetree/interfaces.py +++ b/src/allmydata/filetree/interfaces.py @@ -2,16 +2,39 @@ from zope.interface import Interface class INode(Interface): - """This is some sort of retrievable node.""" + """This is some sort of retrievable node. All objects which implement + other I*Node interfaces also implement this one.""" + def is_directory(): + """Return True if this node is an internal directory node.""" class IFileNode(Interface): """This is a file which can be retrieved.""" + def get_uri(): + """Return the URI of the target file. This URI can be passed + to an IDownloader to retrieve the data.""" class IDirectoryNode(Interface): """This is a directory which can be listed.""" + # these calls do not modify the subtree def list(): - """Return a list of names which are children of this node.""" - + """Return a dictionary mapping each childname to a node. These nodes + implement various I*Node interfaces depending upon what they can do.""" + def get(childname): + """Return a child node. Raises NoSuchChildError if there is no + child of that name.""" + def get_subtree(): + """Return the ISubTree which contains this node.""" + + # the following calls modify the subtree. After calling them, you must + # tell the enclosing subtree to serialize and upload itself. They can + # only be called if this directory node is associated with a mutable + # subtree. + def delete(childname): + """Delete any child referenced by this name.""" + def add_subdir(childname): + """Create a new directory node, and return it.""" + def add(childname, node): + """Add a new node to this path. Returns self.""" class ISubTree(Interface): """A subtree is a collection of Nodes: files, directories, other trees. @@ -27,25 +50,27 @@ class ISubTree(Interface): a DirectoryNode, or it might be a FileNode. """ - def get(path, opener): - """Return a Deferred that fires with the node at the given path, or - None if there is no such node. This will traverse and create subtrees - as necessary.""" + def populate_from_specification(spec, parent_is_mutable, downloader): + """Given a specification tuple, arrange to populate this subtree by + pulling data from some source (possibly the mesh, or the queen, or an + HTTP server, or the local filesystem). Return a Deferred that will + fire (with self) when this subtree is ready for use (specifically + when it is ready for get() and add() calls). + """ - def add(path, child, opener, work_queue): - """Add 'child' (which must implement INode) to the tree at 'path' - (which must be a list of pathname components). This will schedule all - the work necessary to cause the child to be added reliably.""" + def populate_from_node(node, parent_is_mutable, downloader): + """Like populate_from_specification.""" - def find_lowest_containing_subtree_for_path(path, opener): - # not for external use. This is used internally by add(). - """Find the subtree which contains the target path, opening new - subtrees if necessary. Return a Deferred that fires with (subtree, - prepath, postpath), where prepath is the list of path components that - got to the subtree, and postpath is the list of remaining path - components (indicating a subpath within the resulting subtree). This - will traverse and even create subtrees as necessary.""" + def populate_from_data(data): + """Used internally by populate_from_specification. This is called + with a sequence of bytes that describes the contents of the subtree, + probably a bencoded tuple or s-expression. Returns self. + """ + def unserialize(serialized_data): + """Populate all nodes from serialized_data, previously created by + calling my serialize() method. 'serialized_data' is a series of + nested lists (s-expressions), probably recorded in bencoded form.""" def is_mutable(): """This returns True if we have the ability to modify this subtree. @@ -54,63 +79,85 @@ class ISubTree(Interface): """ def get_node_for_path(path): - """Ask this subtree to follow the path through its internal nodes. If - the path terminates within this subtree, return (True, node), where - 'node' implements INode (and also IMutableNode if this subtree - is_mutable). If the path takes us beyond this subtree, return (False, - next_subtree_spec, subpath), where 'next_subtree_spec' is a string - that can be passed to an Opener to create a new subtree, and - 'subpath' is the subset of 'path' that can be passed to this new - subtree. If the path cannot be found within the subtree (and it is - not in the domain of some child subtree), return None. + """Ask this subtree to follow the path through its internal nodes. + + Returns a tuple of (found_path, node, remaining_path). This method + operations synchronously, and does not return a Deferred. + + (found_path=path, found_node, []) + If the path terminates within this subtree, found_path=path and + remaining_path=[], and the node will be an internal IDirectoryNode. + + (found_path, last_node, remaining_path) + If the path does not terminate within this subtree but neither does + it exit this subtree, the last internal IDirectoryNode that *was* on + the path will be returned in 'node'. The path components that led to + this node will be in found_path, and the remaining components will be + in remaining_path. If you want to create the target node, loop over + remaining_path as follows:: + + while remaining_path: + node = node.add_subdir(remaining_path.pop(0)) + + (found_path, exit_node, remaining_path) + If the path leaves this subtree, 'node' will be a different kind of + INode (probably one that points at a child directory of some sort), + found_path will be the components that led to this point, and + remaining_path will be the remaining components. If you still wish to + locate the target, use 'node' to open a new subtree, then provide + 'remaining_path' to the new subtree's get_node_for_path() method. + """ - def get_or_create_node_for_path(path): - """Like get_node_for_path, but instead of returning None, the subtree - will create internal nodes as necessary. Therefore it always returns - either (True, node), or (False, next_subtree_spec, prepath, postpath). + def update(prepath, workqueue): + """Perform and schedule whatever work is necessary to record this + subtree to persistent storage and update the parent at 'prepath' + with a new child specification. + + For directory subtrees, this will cause the subtree to serialize + itself to a file, then add instructions to the workqueue to first + upload this file to the mesh, then add the file's URI to the parent's + subtree. The second instruction will possibly cause recursion, until + some subtree is updated which does not require notifying the parent. """ def serialize(): """Return a series of nested lists which describe my structure in a form that can be bencoded.""" - def unserialize(serialized_data): - """Populate all nodes from serialized_data, previously created by - calling my serialize() method. 'serialized_data' is a series of - nested lists (s-expressions), probably recorded in bencoded form.""" - - -class IMutableSubTree(Interface): - def mutation_affects_parent(): - """This returns True for CHK nodes where you must inform the parent - of the new URI each time you change the child subtree. It returns - False for SSK nodes (or other nodes which have a pointer stored in - some mutable form). - """ - def add_subpath(subpath, child_spec, work_queue): - """Ask this subtree to add the given child to an internal node at the - given subpath. The subpath must not exit the subtree through another - subtree (specifically get_subtree_for_path(subpath) must either - return None or (True,node), and in the latter case, this subtree will - create new internal nodes as necessary). - - The subtree will probably serialize itself to a file and add steps to - the work queue to accomplish its goals. - - This returns a Deferred (the value of which is ignored) when - everything has been added to the work queue. - """ - - def serialize_to_file(f): - """Write a bencoded data structure to the given filehandle that can - be used to reproduce the contents of this subtree.""" - -class ISubTreeSpecification(Interface): - def serialize(): - """Return a tuple that describes this subtree. This tuple can be - passed to IOpener.open() to reconstitute the subtree.""" +#class IMutableSubTree(Interface): +# def mutation_affects_parent(): +# """This returns True for CHK nodes where you must inform the parent +# of the new URI each time you change the child subtree. It returns +# False for SSK nodes (or other nodes which have a pointer stored in +# some mutable form). +# """ +# +# def add_subpath(subpath, child_spec, work_queue): +# """Ask this subtree to add the given child to an internal node at the +# given subpath. The subpath must not exit the subtree through another +# subtree (specifically get_subtree_for_path(subpath) must either +# return None or (True,node), and in the latter case, this subtree will +# create new internal nodes as necessary). +# +# The subtree will probably serialize itself to a file and add steps to +# the work queue to accomplish its goals. +# +# This returns a Deferred (the value of which is ignored) when +# everything has been added to the work queue. +# """ +# +# def serialize_to_file(f): +# """Write a bencoded data structure to the given filehandle that can +# be used to reproduce the contents of this subtree.""" +# +#class ISubTreeSpecification(Interface): +# def serialize(): +# """Return a tuple that describes this subtree. This tuple can be +# passed to IOpener.open() to reconstitute the subtree. It can also be +# bencoded and stuffed in a series of persistent bytes somewhere on the +# mesh or in a file.""" class IOpener(Interface): def open(subtree_specification, parent_is_mutable): @@ -121,3 +168,51 @@ class IOpener(Interface): local disk, or asking some central-service node for the current value.""" + +class IVirtualDrive(Interface): + + # commands to manipulate files + + def list(path): + """List the contents of the directory at the given path. + + 'path' is a list of strings (empty to refer to the root directory) + and must refer to a DIRECTORY node. This method returns a Deferred + that fires with a dictionary that maps strings to filetypes. The + strings are useful as path name components. The filetypes are + Interfaces: either IDirectoryNode if path+[childname] can be used in + a 'list' method, or IFileNode if path+[childname] can be used in a + 'download' method. + """ + + def download(path, target): + """Download the file at the given path to 'target'. + + 'path' must refer to a FILE. 'target' must implement IDownloadTarget. + This returns a Deferred that fires (with 'target') when the download + is complete. + """ + + def upload_now(path, uploadable): + """Upload a file to the given path. The path must not already exist. + + path[:-1] must refer to a writable DIRECTORY node. 'uploadable' must + implement IUploadable. This returns a Deferred that fires (with + 'uploadable') when the upload is complete. + """ + + def upload_later(path, filename): + """Upload a file from disk to the given path. + """ + + def delete(path): + """Delete the file or directory at the given path. + + Returns a Deferred that fires (with self) when the delete is + complete. + """ + + # commands to manipulate subtrees + + # ... detach subtree, merge subtree, etc + diff --git a/src/allmydata/filetree/opener.py b/src/allmydata/filetree/opener.py index cccb39da..ce6dd39e 100644 --- a/src/allmydata/filetree/opener.py +++ b/src/allmydata/filetree/opener.py @@ -1,35 +1,38 @@ from zope.interface import implements from twisted.internet import defer -from allmydata.util import bencode -from allmydata.filetree import interfaces, directory -from allmydata.filetree import specification as fspec -from allmydata.filetree.file import CHKFile, MutableSSKFile, ImmutableSSKFile - -def unserialize_subtree_specification(serialized_spec): - assert isinstance(serialized_spec, tuple) - for stype in [fspec.CHKDirectorySpecification, - fspec.ImmutableSSKDirectorySpecification, - fspec.MutableSSKDirectorySpecification, - fspec.LocalFileRedirection, - fspec.QueenRedirection, - fspec.HTTPRedirection, - fspec.QueenOrLocalFileRedirection, - ]: - if tuple[0] == stype: - spec = stype() - spec.unserialize(serialized_spec) - return spec - raise RuntimeError("unable to unserialize subtree specification '%s'" % - (serialized_spec,)) - +from allmydata.filetree import interfaces, directory, redirect +#from allmydata.filetree.file import CHKFile, MutableSSKFile, ImmutableSSKFile +#from allmydata.filetree.specification import unserialize_subtree_specification + +all_openable_subtree_types = [ + directory.CHKDirectorySubTree, + directory.SSKDirectorySubTree, + redirect.LocalFileRedirection, + redirect.QueenRedirection, + redirect.HTTPRedirection, + redirect.QueenOrLocalFileRedirection, + ] class Opener(object): implements(interfaces.IOpener) - def __init__(self, queen): + def __init__(self, queen, downloader): self._queen = queen + self._downloader = downloader self._cache = {} + def _create(self, spec, parent_is_mutable): + assert isinstance(spec, tuple) + for subtree_class in all_openable_subtree_types: + if spec[0] == subtree_class.stype: + subtree = subtree_class() + d = subtree.populate_from_specification(spec, + parent_is_mutable, + self._downloader) + return d + raise RuntimeError("unable to handle subtree specification '%s'" + % (spec,)) + def open(self, subtree_specification, parent_is_mutable): spec = interfaces.ISubTreeSpecification(subtree_specification) @@ -37,38 +40,16 @@ class Opener(object): if spec in self._cache: return defer.succeed(self._cache[spec]) - # is it a file? - if isinstance(spec, fspec.CHKFileSpecification): - return self._get_chk_file(spec) - if isinstance(spec, (fspec.MutableSSKFileSpecification, - fspec.ImmutableSSKFileSpecification)): - return self._get_ssk_file(spec) - - # is it a directory? - if isinstance(spec, fspec.CHKDirectorySpecification): - return self._get_chk_dir(spec, parent_is_mutable) - if isinstance(spec, (fspec.ImmutableSSKDirectorySpecification, - fspec.MutableSSKDirectorySpecification)): - return self._get_ssk_dir(spec) - - # is it a redirection to a file or directory? - if isinstance(spec, fspec.LocalFileRedirection): - return self._get_local_redir(spec) - if isinstance(spec, fspec.QueenRedirection): - return self._get_queen_redir(spec) - if isinstance(spec, fspec.HTTPRedirection): - return self._get_http_redir(spec) - if isinstance(spec, fspec.QueenOrLocalFileRedirection): - return self._get_queen_or_local_redir(spec) - - # none of the above - raise RuntimeError("I do not know how to open '%s'" % (spec,)) + d = defer.maybeDeferred(self._create, spec, parent_is_mutable) + d.addCallback(self._add_to_cache, spec) + return d def _add_to_cache(self, subtree, spec): self._cache[spec] = subtree # TODO: remove things from the cache eventually return subtree +""" def _get_chk_file(self, spec): subtree = CHKFile(spec.get_uri()) return defer.succeed(subtree) @@ -82,93 +63,4 @@ class Opener(object): subtree = ImmutableSSKFile(spec.get_read_cap()) return defer.succeed(subtree) - def _get_chk_dir(self, spec, parent_is_mutable): - uri = spec.get_uri() - if parent_is_mutable: - subtree = directory.MutableCHKDirectorySubTree() - subtree.set_uri(uri) - else: - subtree = directory.ImmutableDirectorySubTree() - d = self.downloader.get_chk(uri) - d.addCallback(subtree.unserialize) - d.addCallback(self._add_to_cache, spec) - return d - - def _get_ssk_dir(self, spec): - mutable = isinstance(spec, fspec.ImmutableSSKDirectorySpecification) - if mutable: - subtree = directory.ImmutableDirectorySubTree() - else: - assert isinstance(spec, fspec.MutableSSKDirectorySpecification) - subtree = directory.MutableSSKDirectorySubTree() - subtree.set_write_capability(spec.get_write_capability()) - read_cap = spec.get_read_capability() - subtree.set_read_capability(read_cap) - d = self.downloader.get_ssk_latest(read_cap) - def _set_version(res): - version, data = res - if mutable: - subtree.set_version(version) - return data - d.addCallback(_set_version) - d.addCallback(subtree.unserialize) - d.addCallback(self._add_to_cache, spec) - return d - - def _get_local_redir(self, spec): - # there is a local file which contains a bencoded serialized - # subtree specification. - filename = spec.get_filename() - # TODO: will this enable outsiders to cause us to read from - # arbitrary files? Think about this. - f = open(filename, "rb") - data = bencode.bdecode(f.read()) - f.close() - # note: we don't cache the contents of the file. TODO: consider - # doing this based upon mtime. It is important that we be able to - # notice if the file has been changed. - new_spec = unserialize_subtree_specification(data) - return self.open(new_spec, True) - - def _get_queen_redir(self, spec): - # this specifies a handle for which the Queen maintains a - # serialized subtree specification. - handle = spec.get_handle() - d = self._queen.callRemote("lookup_handle", handle) - d.addCallback(unserialize_subtree_specification) - d.addCallback(self.open, True) - return d - - def _get_http_redir(self, spec): - # this specifies a URL at which there is a bencoded serialized - # subtree specification. - url = spec.get_url() - from twisted.web import client - d = client.getPage(url) - d.addCallback(bencode.bdecode) - d.addCallback(unserialize_subtree_specification) - d.addCallback(self.open, False) - return d - - def _get_queen_or_local_redir(self, spec): - # there is a local file which contains a bencoded serialized - # subtree specification. The queen also has a copy. Whomever has - # the higher version number wins. - filename = spec.get_filename() - f = open(filename, "rb") - local_version, local_data = bencode.bdecode(f.read()) - f.close() - handle = spec.get_handle() - # TODO: pubsub so we can cache the queen's results - d = self._queen.callRemote("lookup_handle", handle) - def _got_queen(response): - queen_version, queen_data = response - if queen_version > local_version: - return queen_data - return local_data - d.addCallback(_got_queen) - d.addCallback(unserialize_subtree_specification) - d.addCallback(self.open, True) - return d - - +""" diff --git a/src/allmydata/filetree/redirect.py b/src/allmydata/filetree/redirect.py new file mode 100644 index 00000000..1c4fe245 --- /dev/null +++ b/src/allmydata/filetree/redirect.py @@ -0,0 +1,98 @@ + +from allmydata.util import bencode + +class LocalFileRedirection(object): + stype = "LocalFileRedirection" + + def populate_from_specification(self, spec, parent_is_mutable, downloader): + # return a Deferred that fires (with self) when this node is ready + # for use + + (stype, filename) = spec + assert stype == self.stype + #filename = spec.get_filename() + # there is a local file which contains a bencoded serialized + # subtree specification. + + # TODO: will this enable outsiders to cause us to read from + # arbitrary files? Think about this. + f = open(filename, "rb") + data = f.read() + f.close() + # note: we don't cache the contents of the file. TODO: consider + # doing this based upon mtime. It is important that we be able to + # notice if the file has been changed. + + return self.populate_from_data(data) + + def populate_from_data(self, data): + # data is a subtree specification for our one child + self.child_spec = bencode.bdecode(data) + return self + +class QueenRedirection(object): + stype = "QueenRedirection" + + def populate_from_specification(self, spec, parent_is_mutable, downloader): + # this specifies a handle for which the Queen maintains a + # serialized subtree specification. + (stype, handle) = spec + + # TODO: queen? + d = self._queen.callRemote("lookup_handle", handle) + d.addCallback(self.populate_from_data) + return d + + def populate_from_data(self, data): + self.child_spec = bencode.bdecode(data) + return self + +class QueenOrLocalFileRedirection(object): + stype = "QueenOrLocalFileRedirection" + + def populate_from_specification(self, spec, parent_is_mutable, downloader): + # there is a local file which contains a bencoded serialized + # subtree specification. The queen also has a copy. Whomever has + # the higher version number wins. + (stype, filename, handle) = spec + + f = open(filename, "rb") + #local_version, local_data = bencode.bdecode(f.read()) + local_version_and_data = f.read() + f.close() + + # TODO: queen? + # TODO: pubsub so we can cache the queen's results + d = self._queen.callRemote("lookup_handle", handle) + d.addCallback(self._choose_winner, local_version_and_data) + return d + + def _choose_winner(self, queen_version_and_data, local_version_and_data): + queen_version, queen_data = bencode.bdecode(queen_version_and_data) + local_version, local_data = bencode.bdecode(local_version_and_data) + if queen_version > local_version: + data = queen_data + else: + data = local_data + return self.populate_from_data(data) + + def populate_from_data(self, data): + # NOTE: two layers of bencoding here, TODO + self.child_spec = bencode.bdecode(data) + return self + +class HTTPRedirection(object): + stype = "HTTPRedirection" + + def populate_from_specification(self, spec, parent_is_mutable, downloader): + # this specifies a URL at which there is a bencoded serialized + # subtree specification. + (stype, url) = spec + from twisted.web import client + d = client.getPage(url) + d.addCallback(self.populate_from_data) + return d + + def populate_from_data(self, data): + self.child_spec = bencode.bdecode(data) + return self diff --git a/src/allmydata/filetree/specification.py b/src/allmydata/filetree/specification.py index d9d2a7e9..a09f9b3e 100644 --- a/src/allmydata/filetree/specification.py +++ b/src/allmydata/filetree/specification.py @@ -1,4 +1,5 @@ +""" from zope.interface import implements from allmydata.filetree.interfaces import ISubTreeSpecification @@ -40,87 +41,24 @@ class MutableSSKFileSpecification(ImmutableSSKFileSpecification): self.read_cap = data[1] self.write_cap = data[2] -class CHKDirectorySpecification(object): - implements(ISubTreeSpecification) - stype = "CHK-Directory" - def set_uri(self, uri): - self.uri = uri - def serialize(self): - return (self.stype, self.uri) - def unserialize(self, data): - assert data[0] == self.stype - self.uri = data[1] - -class ImmutableSSKDirectorySpecification(object): - implements(ISubTreeSpecification) - stype = "SSK-Readonly-Directory" - def set_read_capability(self, read_cap): - self.read_cap = read_cap - def get_read_capability(self): - return self.read_cap - def serialize(self): - return (self.stype, self.read_cap) - def unserialize(self, data): - assert data[0] == self.stype - self.read_cap = data[1] - -class MutableSSKDirectorySpecification(ImmutableSSKDirectorySpecification): - implements(ISubTreeSpecification) - stype = "SSK-ReadWrite-Directory" - def set_write_capability(self, write_cap): - self.write_cap = write_cap - def get_write_capability(self): - return self.write_cap - def serialize(self): - return (self.stype, self.read_cap, self.write_cap) - def unserialize(self, data): - assert data[0] == self.stype - self.read_cap = data[1] - self.write_cap = data[2] -class LocalFileRedirection(object): - implements(ISubTreeSpecification) - stype = "LocalFile" - def set_filename(self, filename): - self.filename = filename - def get_filename(self): - return self.filename - def serialize(self): - return (self.stype, self.filename) - -class QueenRedirection(object): - implements(ISubTreeSpecification) - stype = "QueenRedirection" - def set_handle(self, handle): - self.handle = handle - def get_handle(self): - return self.handle - def serialize(self): - return (self.stype, self.handle) - -class HTTPRedirection(object): - implements(ISubTreeSpecification) - stype = "HTTPRedirection" - def set_url(self, url): - self.url = url - def get_url(self): - return self.url - def serialize(self): - return (self.stype, self.url) - -class QueenOrLocalFileRedirection(object): - implements(ISubTreeSpecification) - stype = "QueenOrLocalFile" - def set_filename(self, filename): - self.filename = filename - def get_filename(self): - return self.filename - def set_handle(self, handle): - self.handle = handle - def get_handle(self): - return self.handle - def serialize(self): - return (self.stype, self.handle, self.filename) +def unserialize_subtree_specification(serialized_spec): + assert isinstance(serialized_spec, tuple) + for stype in [CHKDirectorySpecification, + ImmutableSSKDirectorySpecification, + MutableSSKDirectorySpecification, + LocalFileRedirection, + QueenRedirection, + HTTPRedirection, + QueenOrLocalFileRedirection, + ]: + if tuple[0] == stype: + spec = stype() + spec.unserialize(serialized_spec) + return spec + raise RuntimeError("unable to unserialize subtree specification '%s'" % + (serialized_spec,)) +""" diff --git a/src/allmydata/filetree/vdrive.py b/src/allmydata/filetree/vdrive.py new file mode 100644 index 00000000..1ada7cd8 --- /dev/null +++ b/src/allmydata/filetree/vdrive.py @@ -0,0 +1,176 @@ + +from allmydata.filetree import interfaces, opener + +class VirtualDrive(object): + implements(interfaces.IVirtualDrive) + + def __init__(self, workqueue, downloader, root_specification): + self.workqueue = workqueue + workqueue.set_vdrive(self) + # TODO: queen? + self.opener = Opener(queen, downloader) + self.root_specification = root_specification + + # these methods are used to walk through our subtrees + + def _get_root(self): + return self.opener.open(self.root_specification, False) + + def _get_node(self, path): + d = self._get_closest_node(path) + def _got_node((node, remaining_path)): + if remaining_path: + return None + return node + d.addCallback(_got_node) + return d + + def _get_closest_node(self, path): + """Find the closest directory node parent for the desired path. + Return a Deferred that fires with (node, remaining_path). + """ + d = self._get_root() + d.addCallback(self._get_closest_node_1, path) + return d + + def _get_closest_node_1(self, subtree, path): + d = subtree.get_node_for_path(path) + d.addCallback(self._get_closest_node_2, subtree.is_mutable()) + return d + + def _get_closest_node_2(self, res, parent_is_mutable): + (found_path, node, remaining_path) = res + if node.is_directory(): + # traversal done + return (node, remaining_path) + # otherwise, we must open and recurse into a new subtree + d = self.opener.open(node, parent_is_mutable) + def _opened(next_subtree): + next_subtree = ISubTree(next_subtree) + return self._get_closest_node_1(next_subtree, remaining_path) + d.addCallback(_opened) + return d + + def _get_directory(self, path): + """Return a Deferred that fires with the IDirectoryNode at the given + path, or raise NoSuchDirectoryError if there is no such node. This + will traverse subtrees as necessary.""" + d = self._get_node(path) + def _got_directory(node): + if not node: + raise NoSuchDirectoryError + assert interfaces.IDirectoryNode(node) + return node + d.addCallback(_got_directory) + return d + + def _get_file(self, path): + """Return a Deferred that files with an IFileNode at the given path, + or raises a NoSuchDirectoryError or NoSuchChildError, or some other + error if the path refers to something other than a file.""" + d = self._get_node(path) + def _got_node(node): + if not node: + raise NoSuchChildError + return IFileNode(node) + d.addCallback(_got_node) + return d + + def _get_file_uri(self, path): + d = self._get_file(path) + d.addCallback(lambda filenode: filenode.get_uri()) + return d + + def _child_should_not_exist(self, path): + d = self._get_node(path) + def _got_node(node): + if node is not None: + raise PathAlreadyExistsError + d.addCallback(_got_node) + return d + + def _child_should_exist(self, path): + d = self._get_node(path) + def _got_node(node): + if node is None: + raise PathDoesNotExistError + d.addCallback(_got_node) + return d + + def _get_closest_node_and_prepath(self, path): + d = self._get_closest_node(path) + def _got_closest((node, remaining_path)): + prepath_len = len(path) - len(remaining_path) + prepath = path[:prepath_len] + assert path[prepath_len:] == remaining_path + return (prepath, node, remaining_path) + d.addCallback(_got_closest) + return d + + # these are called by the workqueue + + def add(self, path, new_node): + parent_path = path[:-1] + new_node_path = path[-1] + d = self._get_closest_node_and_prepath(parent_path) + def _got_closest((prepath, node, remaining_path)): + # now tell it to create any necessary parent directories + while remaining_path: + node = node.add_subdir(remaining_path.pop(0)) + # 'node' is now the directory where the child wants to go + return node, prepath + d.addCallback(_got_closest) + def _add_new_node((node, prepath)): + node.add(new_node_path, new_node) + subtree = node.get_subtree() + # now, tell the subtree to serialize and upload itself, using the + # workqueue. The subtree will also queue a step to notify its + # parent (using 'prepath'), if necessary. + return subtree.update(prepath, self.workqueue) + d.addCallback(_add_new_node) + return d + + # these are user-visible + + def list(self, path): + d = self._get_directory(path) + d.addCallback(lambda node: node.list()) + return d + + def download(self, path, target): + d = self._get_file_uri(path) + d.addCallback(lambda uri: self.downloader.download(uri, target)) + return d + + def upload_now(self, path, uploadable): + # note: the first few steps of this do not use the workqueue, but I + # think things should remain consistent anyways. If the node is shut + # down before the file has finished uploading, then we forget all + # abou the file. + uploadable = IUploadable(uploadable) + d = self._child_should_not_exist(path) + # then we upload the file + d.addCallback(lambda ignored: self.uploader.upload(uploadable)) + d.addCallback(lambda uri: self.workqueue.create_boxname(uri)) + d.addCallback(lambda boxname: + self.workqueue.add_addpath(boxname, path)) + return d + + def upload_later(self, path, filename): + boxname = self.workqueue.create_boxname() + self.workqueue.add_upload_chk(filename, boxname) + self.workqueue.add_addpath(boxname, path) + + def delete(self, path): + parent_path = path[:-1] + orphan_path = path[-1] + d = self._get_closest_node_and_prepath(parent_path) + def _got_parent((prepath, node, remaining_path)): + assert not remaining_path + node.delete(orphan_path) + # now serialize and upload + subtree = node.get_subtree() + return subtree.update(prepath, self.workqueue) + d.addCallback(_got_parent) + return d + diff --git a/src/allmydata/test/test_filetree_new.py b/src/allmydata/test/test_filetree_new.py index a7434376..65bfee3b 100644 --- a/src/allmydata/test/test_filetree_new.py +++ b/src/allmydata/test/test_filetree_new.py @@ -5,7 +5,7 @@ from twisted.internet import defer from allmydata.filetree.interfaces import IOpener, IDirectoryNode from allmydata.filetree.directory import (ImmutableDirectorySubTree, SubTreeNode, - MutableCHKDirectorySubTree) + CHKDirectorySubTree) from allmydata.filetree.specification import (CHKFileSpecification, CHKDirectorySpecification) from allmydata import workqueue @@ -303,3 +303,9 @@ class MultipleSubTrees(unittest.TestCase): return d + +del OneSubTree +del MultipleSubTrees + +class Redirect(unittest.TestCase): + pass diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 29938e09..9607fc93 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -279,6 +279,11 @@ class FileHandle: # the originator of the filehandle reserves the right to close it pass +class IUploader(Interface): + def upload(uploadable): + """Upload the file. 'uploadable' must impement IUploadable. This + returns a Deferred which fires with the URI of the file.""" + class Uploader(service.MultiService): """I am a service that allows file uploading. """ @@ -296,7 +301,7 @@ class Uploader(service.MultiService): return hasher.digest() def upload(self, f): - # this returns (verifierid, encoding_params) + # this returns the URI assert self.parent assert self.running f = IUploadable(f) diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py index e1c06b1c..14e83dfc 100644 --- a/src/allmydata/workqueue.py +++ b/src/allmydata/workqueue.py @@ -26,7 +26,7 @@ class IWorkQueue(Interface): def create_tempfile(suffix=""): """Return (f, filename).""" - def create_boxname(): + def create_boxname(contents=None): """Return a unique box name (as a string).""" def add_upload_chk(source_filename, stash_uri_in_boxname): @@ -156,6 +156,9 @@ class WorkQueue(object): # line specifies what kind of step it is assert self.seqnum < 1000 # TODO: don't let this grow unboundedly + def set_vdrive(self, vdrive): + self.vdrive = vdrive + def create_tempfile(self, suffix=""): randomname = b2a(os.urandom(10)) filename = randomname + suffix @@ -342,8 +345,8 @@ class WorkQueue(object): def step_addpath(self, boxname, *path): data = self.read_from_box(boxname) - child_spec = unserialize(data) - return self.root.add_subpath(path, child_spec, self) + child_node = unserialize(data) # TODO: unserialize ? + return self.vdrive.add(path, node) def step_retain_ssk(self, index_a, read_key_a): pass