From: Brian Warner Date: Sun, 21 Jan 2007 22:15:31 +0000 (-0700) Subject: move IWorkQueue into allmydata.interfaces, give VirtualDrive an uploader X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~335 X-Git-Url: https://git.rkrishnan.org/simplejson/components/%22file:/-?a=commitdiff_plain;h=81d093b649626a97c48df99dcd9a167899d6e732;p=tahoe-lafs%2Ftahoe-lafs.git move IWorkQueue into allmydata.interfaces, give VirtualDrive an uploader --- diff --git a/src/allmydata/filetree/vdrive.py b/src/allmydata/filetree/vdrive.py index e2c1f15e..8dc931bc 100644 --- a/src/allmydata/filetree/vdrive.py +++ b/src/allmydata/filetree/vdrive.py @@ -8,8 +8,8 @@ from allmydata.filetree.interfaces import ( NoSuchDirectoryError, NoSuchChildError, PathAlreadyExistsError, PathDoesNotExistError, ) -from allmydata.upload import IUploadable -from allmydata.interfaces import IDownloader +from allmydata.interfaces import (IDownloader, IUploadable, IUploader, + IWorkQueue) from allmydata.filetree.nodemaker import NodeMaker @@ -74,10 +74,14 @@ class SubTreeMaker(object): class VirtualDrive(object): implements(IVirtualDrive) - def __init__(self, workqueue, downloader, root_node): + def __init__(self, workqueue, downloader, uploader, root_node): + assert IWorkQueue(workqueue) + assert IDownloader(downloader) + assert IUploader(uploader) assert INode(root_node) self.workqueue = workqueue workqueue.set_vdrive(self) + workqueue.set_uploader(uploader) # TODO: queen? self.queen = None self.root_node = root_node diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 0e1a8456..8be60721 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -223,3 +223,115 @@ class IUploader(Interface): def upload_ssk(write_capability, new_version, uploadable): pass # TODO + + +class IWorkQueue(Interface): + """Each filetable root is associated a work queue, which is persisted on + disk and contains idempotent actions that need to be performed. After + each action is completed, it is removed from the queue. + + The queue is broken up into several sections. First are the 'upload' + steps. After this are the 'add_subpath' commands. The last section has + the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe + interspersed with 'upload', maybe after 'add_subpath' and before + 'unlink'. + + The general idea is that the processing of the work queue could be + interrupted at any time, in the middle of a step, and the next time the + application is started, the step can be re-started without problems. The + placement of the 'retain' commands depends upon how long we might expect + the app to be offline. + + tempfiles: the workqueue has a special directory where temporary files + are stored. create_tempfile() generates these files, while steps like + add_upload_chk() use them. The add_delete_tempfile() will delete the + tempfile. All tempfiles are deleted when the workqueue becomes empty, + since at that point none of them can still be referenced. + + boxes: there is another special directory where named slots (called + 'boxes') hold serialized INode specifications (the strings which are + returned by INode.serialize_node()). Boxes are created by calling + create_boxname(). Boxes are filled either at the time of creation or by + steps like add_upload_chk(). Boxes are used by steps like add_addpath() + and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as + well as when the workqueue becomes empty. + """ + + def create_tempfile(suffix=""): + """Return (f, filename), where 'f' is an open filehandle, and + 'filename' is a string that can be passed to other workqueue steps to + refer to that same file later. NOTE: 'filename' is not an absolute + path, rather it will be interpreted relative to some directory known + only by the workqueue.""" + def create_boxname(contents=None): + """Return a unique box name (as a string). If 'contents' are + provided, it must be an instance that provides INode, and the + serialized form of the node will be written into the box. Otherwise + the boxname can be used by steps like add_upload_chk to hold the + generated uri.""" + + def add_upload_chk(source_filename, stash_uri_in_boxname): + """This step uploads a file to the mesh and obtains a content-based + URI which can be used to later retrieve the same contents ('CHK' + mode). This URI includes unlink rights. It does not mark the file for + retention. + + When the upload is complete, the resulting URI is stashed in a 'box' + with the specified name. This is basically a local variable. A later + 'add_subpath' step will reference this boxname and retrieve the URI. + """ + + def add_upload_ssk(write_capability, previous_version, source_filename): + """This step uploads a file to the mesh in a way that replaces the + previous version and does not require a change to the ID referenced + by the parent. + """ + + def add_queen_update_handle(handle, source_filename): + """Arrange for a central queen to be notified that the given handle + has been updated with the contents of the given tempfile. This will + send a set_handle() message to the queen.""" + + def add_retain_ssk(read_capability): + """Arrange for the given SSK to be kept alive.""" + + def add_unlink_ssk(write_capability): + """Stop keeping the given SSK alive.""" + + def add_retain_uri_from_box(boxname): + """When executed, this step retrieves the URI from the given box and + marks it for retention: this adds it to a list of all URIs that this + system cares about, which will initiate filechecking/repair for the + file.""" + + def add_addpath(boxname, path): + """When executed, this step will retrieve the serialized INode from + the given box and call vdrive.add(path, node) . + """ + + def add_unlink_uri(uri): + """When executed, this step will unlink the data referenced by the + given URI: the unlink rights are used to tell any shareholders to + unlink the file (possibly deleting it), and the URI is removed from + the list that this system cares about, cancelling filechecking/repair + for the file. + + All 'unlink' steps are pushed to the end of the queue. + """ + + def add_delete_tempfile(filename): + """This step will delete a tempfile created by create_tempfile.""" + + def add_delete_box(boxname): + """When executed, this step deletes the given box.""" + + + # methods for use in unit tests + + def flush(): + """Execute all steps in the WorkQueue right away. Return a Deferred + that fires (with self) when the queue is empty. + """ + +class NotCapableError(Exception): + """You have tried to write to a read-only node.""" diff --git a/src/allmydata/test/test_filetree_new.py b/src/allmydata/test/test_filetree_new.py index be01cd05..7393bf75 100644 --- a/src/allmydata/test/test_filetree_new.py +++ b/src/allmydata/test/test_filetree_new.py @@ -11,6 +11,9 @@ from allmydata.interfaces import IDownloader, IUploader from allmydata import workqueue from cStringIO import StringIO +class FakeMesh(object): + implements(IDownloader, IUploader) + """ class FakeOpener(object): implements(IOpener) @@ -332,15 +335,19 @@ class InPairs(unittest.TestCase): class StubDownloader(object): implements(IDownloader) +class StubUploader(object): + implements(IUploader) + class Stuff(unittest.TestCase): def makeVirtualDrive(self, basedir, root_node=None): wq = workqueue.WorkQueue(os.path.join(basedir, "1.workqueue")) dl = StubDownloader() + ul = StubUploader() if not root_node: root_node = directory.LocalFileSubTreeNode() root_node.new("rootdirtree.save") - v = vdrive.VirtualDrive(wq, dl, root_node) + v = vdrive.VirtualDrive(wq, dl, ul, root_node) return v def failUnlessListsAreEqual(self, list1, list2): diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py index 1982fc54..4e469b17 100644 --- a/src/allmydata/workqueue.py +++ b/src/allmydata/workqueue.py @@ -7,117 +7,7 @@ from allmydata.util.idlib import b2a from allmydata.Crypto.Cipher import AES from allmydata.filetree.nodemaker import NodeMaker from allmydata.filetree.interfaces import INode - -class IWorkQueue(Interface): - """Each filetable root is associated a work queue, which is persisted on - disk and contains idempotent actions that need to be performed. After - each action is completed, it is removed from the queue. - - The queue is broken up into several sections. First are the 'upload' - steps. After this are the 'add_subpath' commands. The last section has - the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe - interspersed with 'upload', maybe after 'add_subpath' and before - 'unlink'. - - The general idea is that the processing of the work queue could be - interrupted at any time, in the middle of a step, and the next time the - application is started, the step can be re-started without problems. The - placement of the 'retain' commands depends upon how long we might expect - the app to be offline. - - tempfiles: the workqueue has a special directory where temporary files - are stored. create_tempfile() generates these files, while steps like - add_upload_chk() use them. The add_delete_tempfile() will delete the - tempfile. All tempfiles are deleted when the workqueue becomes empty, - since at that point none of them can still be referenced. - - boxes: there is another special directory where named slots (called - 'boxes') hold serialized INode specifications (the strings which are - returned by INode.serialize_node()). Boxes are created by calling - create_boxname(). Boxes are filled either at the time of creation or by - steps like add_upload_chk(). Boxes are used by steps like add_addpath() - and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as - well as when the workqueue becomes empty. - """ - - def create_tempfile(suffix=""): - """Return (f, filename), where 'f' is an open filehandle, and - 'filename' is a string that can be passed to other workqueue steps to - refer to that same file later. NOTE: 'filename' is not an absolute - path, rather it will be interpreted relative to some directory known - only by the workqueue.""" - def create_boxname(contents=None): - """Return a unique box name (as a string). If 'contents' are - provided, it must be an instance that provides INode, and the - serialized form of the node will be written into the box. Otherwise - the boxname can be used by steps like add_upload_chk to hold the - generated uri.""" - - def add_upload_chk(source_filename, stash_uri_in_boxname): - """This step uploads a file to the mesh and obtains a content-based - URI which can be used to later retrieve the same contents ('CHK' - mode). This URI includes unlink rights. It does not mark the file for - retention. - - When the upload is complete, the resulting URI is stashed in a 'box' - with the specified name. This is basically a local variable. A later - 'add_subpath' step will reference this boxname and retrieve the URI. - """ - - def add_upload_ssk(write_capability, previous_version, source_filename): - """This step uploads a file to the mesh in a way that replaces the - previous version and does not require a change to the ID referenced - by the parent. - """ - - def add_queen_update_handle(handle, source_filename): - """Arrange for a central queen to be notified that the given handle - has been updated with the contents of the given tempfile. This will - send a set_handle() message to the queen.""" - - def add_retain_ssk(read_capability): - """Arrange for the given SSK to be kept alive.""" - - def add_unlink_ssk(write_capability): - """Stop keeping the given SSK alive.""" - - def add_retain_uri_from_box(boxname): - """When executed, this step retrieves the URI from the given box and - marks it for retention: this adds it to a list of all URIs that this - system cares about, which will initiate filechecking/repair for the - file.""" - - def add_addpath(boxname, path): - """When executed, this step will retrieve the serialized INode from - the given box and call vdrive.add(path, node) . - """ - - def add_unlink_uri(uri): - """When executed, this step will unlink the data referenced by the - given URI: the unlink rights are used to tell any shareholders to - unlink the file (possibly deleting it), and the URI is removed from - the list that this system cares about, cancelling filechecking/repair - for the file. - - All 'unlink' steps are pushed to the end of the queue. - """ - - def add_delete_tempfile(filename): - """This step will delete a tempfile created by create_tempfile.""" - - def add_delete_box(boxname): - """When executed, this step deletes the given box.""" - - - # methods for use in unit tests - - def flush(): - """Execute all steps in the WorkQueue right away. Return a Deferred - that fires (with self) when the queue is empty. - """ - -class NotCapableError(Exception): - """You have tried to write to a read-only node.""" +from allmydata.interfaces import IWorkQueue, NotCapableError, IUploader class Step(object): @@ -161,6 +51,8 @@ class WorkQueue(object): assert basedir.endswith("workqueue") self.basedir = basedir self._node_maker = NodeMaker() + self._uploader = None # filled in later + self._downloader = None # filled in later self.seqnum = 0 self.tmpdir = os.path.join(basedir, "tmp") #self.trashdir = os.path.join(basedir, "trash") @@ -196,6 +88,9 @@ class WorkQueue(object): def set_vdrive(self, vdrive): self.vdrive = vdrive + def set_uploader(self, uploader): + assert IUploader(uploader) + self._uploader = uploader def create_tempfile(self, suffix=""): randomname = b2a(os.urandom(10))