def new(self, uri):
self.uri = uri
+ return self
def get_base_data(self):
return self.uri
path[:-1] must refer to a writable DIRECTORY node. 'uploadable' must
implement IUploadable. This returns a Deferred that fires (with
- 'uploadable') when the upload is complete.
+ 'uploadable') when the upload is complete. Do not use the workqueue.
"""
def upload_later(path, filename):
- """Upload a file from disk to the given path.
+ """Upload a file from disk to the given path. Use the workqueue.
"""
def delete(path):
complete.
"""
+ def add_node(path, node):
+ """Add a node to the given path. Use the workqueue.
+ """
+
# commands to manipulate subtrees
# ... detach subtree, merge subtree, etc
--- /dev/null
+
+from zope.interface import implements
+from allmydata.filetree import directory, file, redirect
+from allmydata.filetree.interfaces import INodeMaker
+
+# this list is used by NodeMaker to convert node specification strings (found
+# inside the serialized form of subtrees) into Nodes (which live in the
+# in-RAM form of subtrees).
+all_node_types = [
+ directory.LocalFileSubTreeNode,
+ directory.CHKDirectorySubTreeNode,
+ directory.SSKDirectorySubTreeNode,
+ file.CHKFileNode,
+ file.SSKFileNode,
+ redirect.LocalFileRedirectionNode,
+ redirect.QueenRedirectionNode,
+ redirect.HTTPRedirectionNode,
+ redirect.QueenOrLocalFileRedirectionNode,
+]
+
+class NodeMaker(object):
+ implements(INodeMaker)
+
+ def make_node_from_serialized(self, serialized):
+ # this turns a string into an INode, which contains information about
+ # the file or directory (like a URI), but does not contain the actual
+ # contents. An ISubTreeMaker can be used later to retrieve the
+ # contents (which means downloading the file if this is an IFileNode,
+ # or perhaps creating a new subtree from the contents)
+
+ # maybe include parent_is_mutable?
+ assert isinstance(serialized, str)
+ prefix, body = serialized.split(":", 2)
+
+ for node_class in all_node_types:
+ if prefix == node_class.prefix:
+ node = node_class()
+ node.populate_node(body, self)
+ return node
+ raise RuntimeError("unable to handle node type '%s'" % prefix)
+
)
from allmydata.upload import IUploadable
-# this list is used by NodeMaker to convert node specification strings (found
-# inside the serialized form of subtrees) into Nodes (which live in the
-# in-RAM form of subtrees).
-all_node_types = [
- directory.LocalFileSubTreeNode,
- directory.CHKDirectorySubTreeNode,
- directory.SSKDirectorySubTreeNode,
- file.CHKFileNode,
- file.SSKFileNode,
- redirect.LocalFileRedirectionNode,
- redirect.QueenRedirectionNode,
- redirect.HTTPRedirectionNode,
- redirect.QueenOrLocalFileRedirectionNode,
-]
-
-class NodeMaker(object):
- implements(INodeMaker)
-
- def make_node_from_serialized(self, serialized):
- # this turns a string into an INode, which contains information about
- # the file or directory (like a URI), but does not contain the actual
- # contents. An ISubTreeMaker can be used later to retrieve the
- # contents (which means downloading the file if this is an IFileNode,
- # or perhaps creating a new subtree from the contents)
-
- # maybe include parent_is_mutable?
- assert isinstance(serialized, str)
- prefix, body = serialized.split(":", 2)
-
- for node_class in all_node_types:
- if prefix == node_class.prefix:
- node = node_class()
- node.populate_node(body, self)
- return node
- raise RuntimeError("unable to handle node type '%s'" % prefix)
-
-
+from allmydata.filetree.nodemaker import NodeMaker
all_openable_subtree_types = [
directory.LocalFileSubTree,
def _got_closest((node, remaining_path)):
prepath_len = len(path) - len(remaining_path)
prepath = path[:prepath_len]
- assert path[prepath_len:] == remaining_path
+ assert path[prepath_len:] == remaining_path, "um, path=%s, prepath=%s, prepath_len=%d, remaining_path=%s" % (path, prepath, prepath_len, remaining_path)
return (prepath, node, remaining_path)
d.addCallback(_got_closest)
return d
d = self._get_closest_node_and_prepath(parent_path)
def _got_closest((prepath, node, remaining_path)):
# now tell it to create any necessary parent directories
+ remaining_path = remaining_path[:]
while remaining_path:
node = node.add_subdir(remaining_path.pop(0))
# 'node' is now the directory where the child wants to go
# these are user-visible
def list(self, path):
+ assert isinstance(path, list)
d = self._get_directory(path)
d.addCallback(lambda node: node.list())
return d
def download(self, path, target):
+ assert isinstance(path, list)
d = self._get_file_uri(path)
d.addCallback(lambda uri: self.downloader.download(uri, target))
return d
def upload_now(self, path, uploadable):
+ assert isinstance(path, list)
# note: the first few steps of this do not use the workqueue, but I
# think things should remain consistent anyways. If the node is shut
# down before the file has finished uploading, then we forget all
d = self._child_should_not_exist(path)
# then we upload the file
d.addCallback(lambda ignored: self.uploader.upload(uploadable))
- d.addCallback(lambda uri: self.workqueue.create_boxname(uri))
- d.addCallback(lambda boxname:
- self.workqueue.add_addpath(boxname, path))
+ def _uploaded(uri):
+ assert isinstance(uri, str)
+ new_node = file.CHKFileNode().new(uri)
+ boxname = self.workqueue.create_boxname(new_node)
+ self.workqueue.add_addpath(boxname, path)
+ self.workqueue.add_delete_box(boxname)
+ d.addCallback(_uploaded)
return d
def upload_later(self, path, filename):
+ assert isinstance(path, list)
boxname = self.workqueue.create_boxname()
self.workqueue.add_upload_chk(filename, boxname)
self.workqueue.add_addpath(boxname, path)
+ self.workqueue.add_delete_box(boxname)
def delete(self, path):
+ assert isinstance(path, list)
parent_path = path[:-1]
orphan_path = path[-1]
d = self._get_closest_node_and_prepath(parent_path)
boxname = subtree.update(self.workqueue)
if boxname:
self.workqueue.add_addpath(boxname, prepath)
+ self.workqueue.add_delete_box(boxname)
return self
d.addCallback(_got_parent)
return d
+ def add_node(self, path, node):
+ assert isinstance(path, list)
+ assert INode(node)
+ assert not IDirectoryNode.providedBy(node)
+ boxname = self.workqueue.create_boxname(node)
+ self.workqueue.add_addpath(boxname, path)
+ self.workqueue.add_delete_box(boxname)
+
def failUnlessListsAreEqual(self, list1, list2):
self.failUnlessEqual(sorted(list1), sorted(list2))
+ def failUnlessContentsAreEqual(self, c1, c2):
+ c1a = dict([(k,v.serialize_node()) for k,v in c1.items()])
+ c2a = dict([(k,v.serialize_node()) for k,v in c2.items()])
+ self.failUnlessEqual(c1a, c2a)
+
def testDirectory(self):
stm = vdrive.SubTreeMaker(None, None)
root = redirect.LocalFileRedirection().new("vdrive-root",
topdir.create_node_now())
root.update_now(None)
- wq = self.makeVirtualDrive("vdrive", root.create_node_now())
+ v = self.makeVirtualDrive("vdrive", root.create_node_now())
- d = wq.list([])
+ d = v.list([])
def _listed(contents):
self.failUnlessEqual(contents, {})
d.addCallback(_listed)
+
+ child1 = CHKFileNode().new("uri1")
+ d.addCallback(lambda res: v.add_node(["a"], child1))
+ d.addCallback(lambda res: v.workqueue.flush())
+ d.addCallback(lambda res: v.list([]))
+ def _listed2(contents):
+ self.failUnlessListsAreEqual(contents.keys(), ["a"])
+ self.failUnlessContentsAreEqual(contents, {"a": child1})
+ d.addCallback(_listed2)
+ child2 = CHKFileNode().new("uri2")
+ child3 = CHKFileNode().new("uri3")
+ d.addCallback(lambda res: v.add_node(["b","c"], child2))
+ d.addCallback(lambda res: v.add_node(["b","d"], child3))
+ d.addCallback(lambda res: v.workqueue.flush())
+ d.addCallback(lambda res: v.list([]))
+ def _listed3(contents):
+ self.failUnlessListsAreEqual(contents.keys(), ["a","b"])
+ d.addCallback(_listed3)
+ d.addCallback(lambda res: v.list(["b"]))
+ def _listed4(contents):
+ self.failUnlessListsAreEqual(contents.keys(), ["c","d"])
+ self.failUnlessContentsAreEqual(contents,
+ {"c": child2, "d": child3})
+ d.addCallback(_listed4)
+
return d
boxfile = os.path.join(wq.boxesdir, boxname)
self.failUnless(os.path.exists(boxfile))
- d = wq.run_all_steps()
+ d = wq.flush()
def _check(res):
self.failUnlessEqual(len(wq.dispatched_steps), 5)
self.failUnlessEqual(wq.dispatched_steps[0][0], "upload_chk")
from allmydata.util import bencode
from allmydata.util.idlib import b2a
from allmydata.Crypto.Cipher import AES
+from allmydata.filetree.nodemaker import NodeMaker
+from allmydata.filetree.interfaces import INode
class IWorkQueue(Interface):
"""Each filetable root is associated a work queue, which is persisted on
application is started, the step can be re-started without problems. The
placement of the 'retain' commands depends upon how long we might expect
the app to be offline.
+
+ tempfiles: the workqueue has a special directory where temporary files
+ are stored. create_tempfile() generates these files, while steps like
+ add_upload_chk() use them. The add_delete_tempfile() will delete the
+ tempfile. All tempfiles are deleted when the workqueue becomes empty,
+ since at that point none of them can still be referenced.
+
+ boxes: there is another special directory where named slots (called
+ 'boxes') hold serialized INode specifications (the strings which are
+ returned by INode.serialize_node()). Boxes are created by calling
+ create_boxname(). Boxes are filled either at the time of creation or by
+ steps like add_upload_chk(). Boxes are used by steps like add_addpath()
+ and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as
+ well as when the workqueue becomes empty.
"""
def create_tempfile(suffix=""):
path, rather it will be interpreted relative to some directory known
only by the workqueue."""
def create_boxname(contents=None):
- """Return a unique box name (as a string)."""
+ """Return a unique box name (as a string). If 'contents' are
+ provided, it must be an instance that provides INode, and the
+ serialized form of the node will be written into the box. Otherwise
+ the boxname can be used by steps like add_upload_chk to hold the
+ generated uri."""
def add_upload_chk(source_filename, stash_uri_in_boxname):
"""This step uploads a file to the mesh and obtains a content-based
def add_delete_box(boxname):
"""When executed, this step deletes the given box."""
+
+ # methods for use in unit tests
+
+ def flush():
+ """Execute all steps in the WorkQueue right away. Return a Deferred
+ that fires (with self) when the queue is empty.
+ """
+
class NotCapableError(Exception):
"""You have tried to write to a read-only node."""
def __init__(self, basedir):
assert basedir.endswith("workqueue")
self.basedir = basedir
+ self._node_maker = NodeMaker()
self.seqnum = 0
self.tmpdir = os.path.join(basedir, "tmp")
#self.trashdir = os.path.join(basedir, "trash")
f = open(os.path.join(self.filesdir, filename), "wb")
return (f, filename)
- def create_boxname(self):
- return b2a(os.urandom(10))
+ def create_boxname(self, contents=None):
+ boxname = b2a(os.urandom(10))
+ if contents is not None:
+ assert INode(contents)
+ self.write_to_box(boxname, contents.serialize_node())
+ return boxname
def write_to_box(self, boxname, data):
f = open(os.path.join(self.boxesdir, boxname), "w")
f.write(data)
if not hasattr(self, handlername):
raise RuntimeError("unknown workqueue step type '%s'" % steptype)
handler = getattr(self, handlername)
- d = defer.maybeDeferred(handler, *lines[1:])
+ d = defer.maybeDeferred(handler, *lines)
return d
def _delete_step(self, res, stepname):
d.addCallback(self.run_all_steps)
return d
return defer.succeed(None)
+ def flush(self):
+ return self.run_all_steps()
def open_tempfile(self, filename):
pass
def step_addpath(self, boxname, *path):
+ path = list(path)
data = self.read_from_box(boxname)
- child_node = unserialize(data) # TODO: unserialize ?
+ child_node = self._node_maker.make_node_from_serialized(data)
return self.vdrive.add(path, child_node)
def step_retain_ssk(self, index_a, read_key_a):