]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
move IWorkQueue into allmydata.interfaces, give VirtualDrive an uploader
authorBrian Warner <warner@lothar.com>
Sun, 21 Jan 2007 22:15:31 +0000 (15:15 -0700)
committerBrian Warner <warner@lothar.com>
Sun, 21 Jan 2007 22:15:31 +0000 (15:15 -0700)
src/allmydata/filetree/vdrive.py
src/allmydata/interfaces.py
src/allmydata/test/test_filetree_new.py
src/allmydata/workqueue.py

index e2c1f15ef42ee42e6079959ead09dea639e5f7a3..8dc931bc65912c87b8ec5f84d7eab460fd168028 100644 (file)
@@ -8,8 +8,8 @@ from allmydata.filetree.interfaces import (
     NoSuchDirectoryError, NoSuchChildError, PathAlreadyExistsError,
     PathDoesNotExistError,
     )
-from allmydata.upload import IUploadable
-from allmydata.interfaces import IDownloader
+from allmydata.interfaces import (IDownloader, IUploadable, IUploader,
+                                  IWorkQueue)
 
 from allmydata.filetree.nodemaker import NodeMaker
 
@@ -74,10 +74,14 @@ class SubTreeMaker(object):
 class VirtualDrive(object):
     implements(IVirtualDrive)
 
-    def __init__(self, workqueue, downloader, root_node):
+    def __init__(self, workqueue, downloader, uploader, root_node):
+        assert IWorkQueue(workqueue)
+        assert IDownloader(downloader)
+        assert IUploader(uploader)
         assert INode(root_node)
         self.workqueue = workqueue
         workqueue.set_vdrive(self)
+        workqueue.set_uploader(uploader)
         # TODO: queen?
         self.queen = None
         self.root_node = root_node
index 0e1a845693713e1f819283d3e577258972d30840..8be60721cccb64aec835e394c411cbf7a5b0ed7b 100644 (file)
@@ -223,3 +223,115 @@ class IUploader(Interface):
 
     def upload_ssk(write_capability, new_version, uploadable):
         pass # TODO
+
+
+class IWorkQueue(Interface):
+    """Each filetable root is associated a work queue, which is persisted on
+    disk and contains idempotent actions that need to be performed. After
+    each action is completed, it is removed from the queue.
+
+    The queue is broken up into several sections. First are the 'upload'
+    steps. After this are the 'add_subpath' commands. The last section has
+    the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe
+    interspersed with 'upload', maybe after 'add_subpath' and before
+    'unlink'.
+
+    The general idea is that the processing of the work queue could be
+    interrupted at any time, in the middle of a step, and the next time the
+    application is started, the step can be re-started without problems. The
+    placement of the 'retain' commands depends upon how long we might expect
+    the app to be offline.
+
+    tempfiles: the workqueue has a special directory where temporary files
+    are stored. create_tempfile() generates these files, while steps like
+    add_upload_chk() use them. The add_delete_tempfile() will delete the
+    tempfile. All tempfiles are deleted when the workqueue becomes empty,
+    since at that point none of them can still be referenced.
+
+    boxes: there is another special directory where named slots (called
+    'boxes') hold serialized INode specifications (the strings which are
+    returned by INode.serialize_node()). Boxes are created by calling
+    create_boxname(). Boxes are filled either at the time of creation or by
+    steps like add_upload_chk(). Boxes are used by steps like add_addpath()
+    and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as
+    well as when the workqueue becomes empty.
+    """
+
+    def create_tempfile(suffix=""):
+        """Return (f, filename), where 'f' is an open filehandle, and
+        'filename' is a string that can be passed to other workqueue steps to
+        refer to that same file later. NOTE: 'filename' is not an absolute
+        path, rather it will be interpreted relative to some directory known
+        only by the workqueue."""
+    def create_boxname(contents=None):
+        """Return a unique box name (as a string). If 'contents' are
+        provided, it must be an instance that provides INode, and the
+        serialized form of the node will be written into the box. Otherwise
+        the boxname can be used by steps like add_upload_chk to hold the
+        generated uri."""
+
+    def add_upload_chk(source_filename, stash_uri_in_boxname):
+        """This step uploads a file to the mesh and obtains a content-based
+        URI which can be used to later retrieve the same contents ('CHK'
+        mode). This URI includes unlink rights. It does not mark the file for
+        retention.
+
+        When the upload is complete, the resulting URI is stashed in a 'box'
+        with the specified name. This is basically a local variable. A later
+        'add_subpath' step will reference this boxname and retrieve the URI.
+        """
+
+    def add_upload_ssk(write_capability, previous_version, source_filename):
+        """This step uploads a file to the mesh in a way that replaces the
+        previous version and does not require a change to the ID referenced
+        by the parent.
+        """
+
+    def add_queen_update_handle(handle, source_filename):
+        """Arrange for a central queen to be notified that the given handle
+        has been updated with the contents of the given tempfile. This will
+        send a set_handle() message to the queen."""
+
+    def add_retain_ssk(read_capability):
+        """Arrange for the given SSK to be kept alive."""
+
+    def add_unlink_ssk(write_capability):
+        """Stop keeping the given SSK alive."""
+
+    def add_retain_uri_from_box(boxname):
+        """When executed, this step retrieves the URI from the given box and
+        marks it for retention: this adds it to a list of all URIs that this
+        system cares about, which will initiate filechecking/repair for the
+        file."""
+
+    def add_addpath(boxname, path):
+        """When executed, this step will retrieve the serialized INode from
+        the given box and call vdrive.add(path, node) .
+        """
+
+    def add_unlink_uri(uri):
+        """When executed, this step will unlink the data referenced by the
+        given URI: the unlink rights are used to tell any shareholders to
+        unlink the file (possibly deleting it), and the URI is removed from
+        the list that this system cares about, cancelling filechecking/repair
+        for the file.
+
+        All 'unlink' steps are pushed to the end of the queue.
+        """
+
+    def add_delete_tempfile(filename):
+        """This step will delete a tempfile created by create_tempfile."""
+
+    def add_delete_box(boxname):
+        """When executed, this step deletes the given box."""
+
+
+    # methods for use in unit tests
+
+    def flush():
+        """Execute all steps in the WorkQueue right away. Return a Deferred
+        that fires (with self) when the queue is empty.
+        """
+
+class NotCapableError(Exception):
+    """You have tried to write to a read-only node."""
index be01cd0529f54217280339bc9d82f60b9a349076..7393bf758ee168a6bb72df33043fdef138498407 100644 (file)
@@ -11,6 +11,9 @@ from allmydata.interfaces import IDownloader, IUploader
 from allmydata import workqueue
 from cStringIO import StringIO
 
+class FakeMesh(object):
+    implements(IDownloader, IUploader)
+
 """
 class FakeOpener(object):
     implements(IOpener)
@@ -332,15 +335,19 @@ class InPairs(unittest.TestCase):
 class StubDownloader(object):
     implements(IDownloader)
 
+class StubUploader(object):
+    implements(IUploader)
+
 class Stuff(unittest.TestCase):
 
     def makeVirtualDrive(self, basedir, root_node=None):
         wq = workqueue.WorkQueue(os.path.join(basedir, "1.workqueue"))
         dl = StubDownloader()
+        ul = StubUploader()
         if not root_node:
             root_node = directory.LocalFileSubTreeNode()
             root_node.new("rootdirtree.save")
-        v = vdrive.VirtualDrive(wq, dl, root_node)
+        v = vdrive.VirtualDrive(wq, dl, ul, root_node)
         return v
 
     def failUnlessListsAreEqual(self, list1, list2):
index 1982fc54b81837e19d04d0c53d9ce92c7fac3960..4e469b17e4c96790d3b494fda6a007b1522a93ca 100644 (file)
@@ -7,117 +7,7 @@ from allmydata.util.idlib import b2a
 from allmydata.Crypto.Cipher import AES
 from allmydata.filetree.nodemaker import NodeMaker
 from allmydata.filetree.interfaces import INode
-
-class IWorkQueue(Interface):
-    """Each filetable root is associated a work queue, which is persisted on
-    disk and contains idempotent actions that need to be performed. After
-    each action is completed, it is removed from the queue.
-
-    The queue is broken up into several sections. First are the 'upload'
-    steps. After this are the 'add_subpath' commands. The last section has
-    the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe
-    interspersed with 'upload', maybe after 'add_subpath' and before
-    'unlink'.
-
-    The general idea is that the processing of the work queue could be
-    interrupted at any time, in the middle of a step, and the next time the
-    application is started, the step can be re-started without problems. The
-    placement of the 'retain' commands depends upon how long we might expect
-    the app to be offline.
-
-    tempfiles: the workqueue has a special directory where temporary files
-    are stored. create_tempfile() generates these files, while steps like
-    add_upload_chk() use them. The add_delete_tempfile() will delete the
-    tempfile. All tempfiles are deleted when the workqueue becomes empty,
-    since at that point none of them can still be referenced.
-
-    boxes: there is another special directory where named slots (called
-    'boxes') hold serialized INode specifications (the strings which are
-    returned by INode.serialize_node()). Boxes are created by calling
-    create_boxname(). Boxes are filled either at the time of creation or by
-    steps like add_upload_chk(). Boxes are used by steps like add_addpath()
-    and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as
-    well as when the workqueue becomes empty.
-    """
-
-    def create_tempfile(suffix=""):
-        """Return (f, filename), where 'f' is an open filehandle, and
-        'filename' is a string that can be passed to other workqueue steps to
-        refer to that same file later. NOTE: 'filename' is not an absolute
-        path, rather it will be interpreted relative to some directory known
-        only by the workqueue."""
-    def create_boxname(contents=None):
-        """Return a unique box name (as a string). If 'contents' are
-        provided, it must be an instance that provides INode, and the
-        serialized form of the node will be written into the box. Otherwise
-        the boxname can be used by steps like add_upload_chk to hold the
-        generated uri."""
-
-    def add_upload_chk(source_filename, stash_uri_in_boxname):
-        """This step uploads a file to the mesh and obtains a content-based
-        URI which can be used to later retrieve the same contents ('CHK'
-        mode). This URI includes unlink rights. It does not mark the file for
-        retention.
-
-        When the upload is complete, the resulting URI is stashed in a 'box'
-        with the specified name. This is basically a local variable. A later
-        'add_subpath' step will reference this boxname and retrieve the URI.
-        """
-
-    def add_upload_ssk(write_capability, previous_version, source_filename):
-        """This step uploads a file to the mesh in a way that replaces the
-        previous version and does not require a change to the ID referenced
-        by the parent.
-        """
-
-    def add_queen_update_handle(handle, source_filename):
-        """Arrange for a central queen to be notified that the given handle
-        has been updated with the contents of the given tempfile. This will
-        send a set_handle() message to the queen."""
-
-    def add_retain_ssk(read_capability):
-        """Arrange for the given SSK to be kept alive."""
-
-    def add_unlink_ssk(write_capability):
-        """Stop keeping the given SSK alive."""
-
-    def add_retain_uri_from_box(boxname):
-        """When executed, this step retrieves the URI from the given box and
-        marks it for retention: this adds it to a list of all URIs that this
-        system cares about, which will initiate filechecking/repair for the
-        file."""
-
-    def add_addpath(boxname, path):
-        """When executed, this step will retrieve the serialized INode from
-        the given box and call vdrive.add(path, node) .
-        """
-
-    def add_unlink_uri(uri):
-        """When executed, this step will unlink the data referenced by the
-        given URI: the unlink rights are used to tell any shareholders to
-        unlink the file (possibly deleting it), and the URI is removed from
-        the list that this system cares about, cancelling filechecking/repair
-        for the file.
-
-        All 'unlink' steps are pushed to the end of the queue.
-        """
-
-    def add_delete_tempfile(filename):
-        """This step will delete a tempfile created by create_tempfile."""
-
-    def add_delete_box(boxname):
-        """When executed, this step deletes the given box."""
-
-
-    # methods for use in unit tests
-
-    def flush():
-        """Execute all steps in the WorkQueue right away. Return a Deferred
-        that fires (with self) when the queue is empty.
-        """
-
-class NotCapableError(Exception):
-    """You have tried to write to a read-only node."""
+from allmydata.interfaces import IWorkQueue, NotCapableError, IUploader
 
 
 class Step(object):
@@ -161,6 +51,8 @@ class WorkQueue(object):
         assert basedir.endswith("workqueue")
         self.basedir = basedir
         self._node_maker = NodeMaker()
+        self._uploader = None # filled in later
+        self._downloader = None # filled in later
         self.seqnum = 0
         self.tmpdir = os.path.join(basedir, "tmp")
         #self.trashdir = os.path.join(basedir, "trash")
@@ -196,6 +88,9 @@ class WorkQueue(object):
 
     def set_vdrive(self, vdrive):
         self.vdrive = vdrive
+    def set_uploader(self, uploader):
+        assert IUploader(uploader)
+        self._uploader = uploader
 
     def create_tempfile(self, suffix=""):
         randomname = b2a(os.urandom(10))