NoSuchDirectoryError, NoSuchChildError, PathAlreadyExistsError,
PathDoesNotExistError,
)
-from allmydata.upload import IUploadable
-from allmydata.interfaces import IDownloader
+from allmydata.interfaces import (IDownloader, IUploadable, IUploader,
+ IWorkQueue)
from allmydata.filetree.nodemaker import NodeMaker
class VirtualDrive(object):
implements(IVirtualDrive)
- def __init__(self, workqueue, downloader, root_node):
+ def __init__(self, workqueue, downloader, uploader, root_node):
+ assert IWorkQueue(workqueue)
+ assert IDownloader(downloader)
+ assert IUploader(uploader)
assert INode(root_node)
self.workqueue = workqueue
workqueue.set_vdrive(self)
+ workqueue.set_uploader(uploader)
# TODO: queen?
self.queen = None
self.root_node = root_node
def upload_ssk(write_capability, new_version, uploadable):
pass # TODO
+
+
+class IWorkQueue(Interface):
+ """Each filetable root is associated a work queue, which is persisted on
+ disk and contains idempotent actions that need to be performed. After
+ each action is completed, it is removed from the queue.
+
+ The queue is broken up into several sections. First are the 'upload'
+ steps. After this are the 'add_subpath' commands. The last section has
+ the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe
+ interspersed with 'upload', maybe after 'add_subpath' and before
+ 'unlink'.
+
+ The general idea is that the processing of the work queue could be
+ interrupted at any time, in the middle of a step, and the next time the
+ application is started, the step can be re-started without problems. The
+ placement of the 'retain' commands depends upon how long we might expect
+ the app to be offline.
+
+ tempfiles: the workqueue has a special directory where temporary files
+ are stored. create_tempfile() generates these files, while steps like
+ add_upload_chk() use them. The add_delete_tempfile() will delete the
+ tempfile. All tempfiles are deleted when the workqueue becomes empty,
+ since at that point none of them can still be referenced.
+
+ boxes: there is another special directory where named slots (called
+ 'boxes') hold serialized INode specifications (the strings which are
+ returned by INode.serialize_node()). Boxes are created by calling
+ create_boxname(). Boxes are filled either at the time of creation or by
+ steps like add_upload_chk(). Boxes are used by steps like add_addpath()
+ and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as
+ well as when the workqueue becomes empty.
+ """
+
+ def create_tempfile(suffix=""):
+ """Return (f, filename), where 'f' is an open filehandle, and
+ 'filename' is a string that can be passed to other workqueue steps to
+ refer to that same file later. NOTE: 'filename' is not an absolute
+ path, rather it will be interpreted relative to some directory known
+ only by the workqueue."""
+ def create_boxname(contents=None):
+ """Return a unique box name (as a string). If 'contents' are
+ provided, it must be an instance that provides INode, and the
+ serialized form of the node will be written into the box. Otherwise
+ the boxname can be used by steps like add_upload_chk to hold the
+ generated uri."""
+
+ def add_upload_chk(source_filename, stash_uri_in_boxname):
+ """This step uploads a file to the mesh and obtains a content-based
+ URI which can be used to later retrieve the same contents ('CHK'
+ mode). This URI includes unlink rights. It does not mark the file for
+ retention.
+
+ When the upload is complete, the resulting URI is stashed in a 'box'
+ with the specified name. This is basically a local variable. A later
+ 'add_subpath' step will reference this boxname and retrieve the URI.
+ """
+
+ def add_upload_ssk(write_capability, previous_version, source_filename):
+ """This step uploads a file to the mesh in a way that replaces the
+ previous version and does not require a change to the ID referenced
+ by the parent.
+ """
+
+ def add_queen_update_handle(handle, source_filename):
+ """Arrange for a central queen to be notified that the given handle
+ has been updated with the contents of the given tempfile. This will
+ send a set_handle() message to the queen."""
+
+ def add_retain_ssk(read_capability):
+ """Arrange for the given SSK to be kept alive."""
+
+ def add_unlink_ssk(write_capability):
+ """Stop keeping the given SSK alive."""
+
+ def add_retain_uri_from_box(boxname):
+ """When executed, this step retrieves the URI from the given box and
+ marks it for retention: this adds it to a list of all URIs that this
+ system cares about, which will initiate filechecking/repair for the
+ file."""
+
+ def add_addpath(boxname, path):
+ """When executed, this step will retrieve the serialized INode from
+ the given box and call vdrive.add(path, node) .
+ """
+
+ def add_unlink_uri(uri):
+ """When executed, this step will unlink the data referenced by the
+ given URI: the unlink rights are used to tell any shareholders to
+ unlink the file (possibly deleting it), and the URI is removed from
+ the list that this system cares about, cancelling filechecking/repair
+ for the file.
+
+ All 'unlink' steps are pushed to the end of the queue.
+ """
+
+ def add_delete_tempfile(filename):
+ """This step will delete a tempfile created by create_tempfile."""
+
+ def add_delete_box(boxname):
+ """When executed, this step deletes the given box."""
+
+
+ # methods for use in unit tests
+
+ def flush():
+ """Execute all steps in the WorkQueue right away. Return a Deferred
+ that fires (with self) when the queue is empty.
+ """
+
+class NotCapableError(Exception):
+ """You have tried to write to a read-only node."""
from allmydata import workqueue
from cStringIO import StringIO
+class FakeMesh(object):
+ implements(IDownloader, IUploader)
+
"""
class FakeOpener(object):
implements(IOpener)
class StubDownloader(object):
implements(IDownloader)
+class StubUploader(object):
+ implements(IUploader)
+
class Stuff(unittest.TestCase):
def makeVirtualDrive(self, basedir, root_node=None):
wq = workqueue.WorkQueue(os.path.join(basedir, "1.workqueue"))
dl = StubDownloader()
+ ul = StubUploader()
if not root_node:
root_node = directory.LocalFileSubTreeNode()
root_node.new("rootdirtree.save")
- v = vdrive.VirtualDrive(wq, dl, root_node)
+ v = vdrive.VirtualDrive(wq, dl, ul, root_node)
return v
def failUnlessListsAreEqual(self, list1, list2):
from allmydata.Crypto.Cipher import AES
from allmydata.filetree.nodemaker import NodeMaker
from allmydata.filetree.interfaces import INode
-
-class IWorkQueue(Interface):
- """Each filetable root is associated a work queue, which is persisted on
- disk and contains idempotent actions that need to be performed. After
- each action is completed, it is removed from the queue.
-
- The queue is broken up into several sections. First are the 'upload'
- steps. After this are the 'add_subpath' commands. The last section has
- the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe
- interspersed with 'upload', maybe after 'add_subpath' and before
- 'unlink'.
-
- The general idea is that the processing of the work queue could be
- interrupted at any time, in the middle of a step, and the next time the
- application is started, the step can be re-started without problems. The
- placement of the 'retain' commands depends upon how long we might expect
- the app to be offline.
-
- tempfiles: the workqueue has a special directory where temporary files
- are stored. create_tempfile() generates these files, while steps like
- add_upload_chk() use them. The add_delete_tempfile() will delete the
- tempfile. All tempfiles are deleted when the workqueue becomes empty,
- since at that point none of them can still be referenced.
-
- boxes: there is another special directory where named slots (called
- 'boxes') hold serialized INode specifications (the strings which are
- returned by INode.serialize_node()). Boxes are created by calling
- create_boxname(). Boxes are filled either at the time of creation or by
- steps like add_upload_chk(). Boxes are used by steps like add_addpath()
- and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as
- well as when the workqueue becomes empty.
- """
-
- def create_tempfile(suffix=""):
- """Return (f, filename), where 'f' is an open filehandle, and
- 'filename' is a string that can be passed to other workqueue steps to
- refer to that same file later. NOTE: 'filename' is not an absolute
- path, rather it will be interpreted relative to some directory known
- only by the workqueue."""
- def create_boxname(contents=None):
- """Return a unique box name (as a string). If 'contents' are
- provided, it must be an instance that provides INode, and the
- serialized form of the node will be written into the box. Otherwise
- the boxname can be used by steps like add_upload_chk to hold the
- generated uri."""
-
- def add_upload_chk(source_filename, stash_uri_in_boxname):
- """This step uploads a file to the mesh and obtains a content-based
- URI which can be used to later retrieve the same contents ('CHK'
- mode). This URI includes unlink rights. It does not mark the file for
- retention.
-
- When the upload is complete, the resulting URI is stashed in a 'box'
- with the specified name. This is basically a local variable. A later
- 'add_subpath' step will reference this boxname and retrieve the URI.
- """
-
- def add_upload_ssk(write_capability, previous_version, source_filename):
- """This step uploads a file to the mesh in a way that replaces the
- previous version and does not require a change to the ID referenced
- by the parent.
- """
-
- def add_queen_update_handle(handle, source_filename):
- """Arrange for a central queen to be notified that the given handle
- has been updated with the contents of the given tempfile. This will
- send a set_handle() message to the queen."""
-
- def add_retain_ssk(read_capability):
- """Arrange for the given SSK to be kept alive."""
-
- def add_unlink_ssk(write_capability):
- """Stop keeping the given SSK alive."""
-
- def add_retain_uri_from_box(boxname):
- """When executed, this step retrieves the URI from the given box and
- marks it for retention: this adds it to a list of all URIs that this
- system cares about, which will initiate filechecking/repair for the
- file."""
-
- def add_addpath(boxname, path):
- """When executed, this step will retrieve the serialized INode from
- the given box and call vdrive.add(path, node) .
- """
-
- def add_unlink_uri(uri):
- """When executed, this step will unlink the data referenced by the
- given URI: the unlink rights are used to tell any shareholders to
- unlink the file (possibly deleting it), and the URI is removed from
- the list that this system cares about, cancelling filechecking/repair
- for the file.
-
- All 'unlink' steps are pushed to the end of the queue.
- """
-
- def add_delete_tempfile(filename):
- """This step will delete a tempfile created by create_tempfile."""
-
- def add_delete_box(boxname):
- """When executed, this step deletes the given box."""
-
-
- # methods for use in unit tests
-
- def flush():
- """Execute all steps in the WorkQueue right away. Return a Deferred
- that fires (with self) when the queue is empty.
- """
-
-class NotCapableError(Exception):
- """You have tried to write to a read-only node."""
+from allmydata.interfaces import IWorkQueue, NotCapableError, IUploader
class Step(object):
assert basedir.endswith("workqueue")
self.basedir = basedir
self._node_maker = NodeMaker()
+ self._uploader = None # filled in later
+ self._downloader = None # filled in later
self.seqnum = 0
self.tmpdir = os.path.join(basedir, "tmp")
#self.trashdir = os.path.join(basedir, "trash")
def set_vdrive(self, vdrive):
self.vdrive = vdrive
+ def set_uploader(self, uploader):
+ assert IUploader(uploader)
+ self._uploader = uploader
def create_tempfile(self, suffix=""):
randomname = b2a(os.urandom(10))