From: Brian Warner Date: Wed, 27 Jun 2007 00:25:23 +0000 (-0700) Subject: remove unused/obsoleted workqueue.py X-Git-Tag: allmydata-tahoe-0.4.0~23 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/%22doc.html/%3C?a=commitdiff_plain;h=2766b7988b6860caca0eff3e2fc3ba346293f37d;p=tahoe-lafs%2Ftahoe-lafs.git remove unused/obsoleted workqueue.py --- diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 028ef26d..a9ccde8c 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -634,153 +634,6 @@ class IUploader(Interface): """Like upload(), but accepts an open filehandle.""" -class IWorkQueue(Interface): - """Each filetable root is associated a work queue, which is persisted on - disk and contains idempotent actions that need to be performed. After - each action is completed, it is removed from the queue. - - The queue is broken up into several sections. First are the 'upload' - steps. After this are the 'add_subpath' commands. The last section has - the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe - interspersed with 'upload', maybe after 'add_subpath' and before - 'unlink'. - - The general idea is that the processing of the work queue could be - interrupted at any time, in the middle of a step, and the next time the - application is started, the step can be re-started without problems. The - placement of the 'retain' commands depends upon how long we might expect - the app to be offline. - - tempfiles: the workqueue has a special directory where temporary files - are stored. create_tempfile() generates these files, while steps like - add_upload_chk() use them. The add_delete_tempfile() will delete the - tempfile. All tempfiles are deleted when the workqueue becomes empty, - since at that point none of them can still be referenced. - - boxes: there is another special directory where named slots (called - 'boxes') hold serialized INode specifications (the strings which are - returned by INode.serialize_node()). Boxes are created by calling - create_boxname(). Boxes are filled either at the time of creation or by - steps like add_upload_chk(). Boxes are used by steps like add_addpath() - and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as - well as when the workqueue becomes empty. - """ - - def create_tempfile(suffix=""): - """Return (f, filename), where 'f' is an open filehandle, and - 'filename' is a string that can be passed to other workqueue steps to - refer to that same file later. NOTE: 'filename' is not an absolute - path, rather it will be interpreted relative to some directory known - only by the workqueue.""" - def create_boxname(contents=None): - """Return a unique box name (as a string). If 'contents' are - provided, it must be an instance that provides INode, and the - serialized form of the node will be written into the box. Otherwise - the boxname can be used by steps like add_upload_chk to hold the - generated uri.""" - - def add_upload_chk(source_filename, stash_uri_in_boxname): - """This step uploads a file to the grid and obtains a content-based - URI which can be used to later retrieve the same contents ('CHK' - mode). This URI includes unlink rights. It does not mark the file for - retention. - - Non-absolute filenames are interpreted relative to the workqueue's - special just-for-tempfiles directory. - - When the upload is complete, the resulting URI is stashed in a 'box' - with the specified name. This is basically a local variable. A later - 'add_subpath' step will reference this boxname and retrieve the URI. - """ - - def add_upload_ssk(write_capability, previous_version, source_filename): - """This step uploads a file to the grid in a way that replaces the - previous version and does not require a change to the ID referenced - by the parent. - """ - - def add_vdrive_update_handle(handle, source_filename): - """Arrange for a vdrive server to be notified that the given handle - has been updated with the contents of the given tempfile. This will - send a set_handle() message to the vdrive.""" - - def add_retain_ssk(read_capability): - """Arrange for the given SSK to be kept alive.""" - - def add_unlink_ssk(write_capability): - """Stop keeping the given SSK alive.""" - - def add_retain_uri_from_box(boxname): - """When executed, this step retrieves the URI from the given box and - marks it for retention: this adds it to a list of all URIs that this - system cares about, which will initiate filechecking/repair for the - file.""" - - def add_addpath(boxname, path): - """When executed, this step pulls a node specification from 'boxname' - and figures out which subtrees must be modified to allow that node to - live at the 'path' (which is an absolute path). This will probably - cause one or more 'add_modify_subtree' or 'add_modify_redirection' - steps to be added to the workqueue. - """ - - def add_deletepath(path): - """When executed, finds the subtree that contains the node at 'path' - and modifies it (and any necessary parent subtrees) to delete that - path. This will probably cause one or more 'add_modify_subtree' or - 'add_modify_redirection' steps to be added to the workqueue. - """ - - def add_modify_subtree(subtree_node, localpath, new_node_boxname, - new_subtree_boxname=None): - """When executed, this step retrieves the subtree specified by - 'subtree_node', pulls a node specification out of 'new_node_boxname', - then modifies the subtree such that a subtree-relative 'localpath' - points to the new node. If 'new_node_boxname' is None, this deletes - the given path. It then serializes the subtree in its new form, and - optionally puts a node that describes the new subtree in - 'new_subtree_boxname' for use by another add_modify_subtree step. - - The idea is that 'subtree_node' will refer a CHKDirectorySubTree, and - 'new_node_boxname' will contain the CHKFileNode that points to a - newly-uploaded file. When the CHKDirectorySubTree is modified, it - acquires a new URI, which will be stuffed (in the form of a - CHKDirectorySubTreeNode) into 'new_subtree_boxname'. A subsequent - step would then read from 'new_subtree_boxname' and modify some other - subtree with the contents. - - If 'subtree_node' refers to a redirection subtree like - LocalFileRedirection or VdriveRedirection, then 'localpath' is - ignored, because redirection subtrees don't consume path components - and have no internal directory structure (they just have the one - redirection target). Redirection subtrees generally retain a constant - identity, so it is unlikely that 'new_subtree_boxname' will be used. - """ - - def add_unlink_uri(uri): - """When executed, this step will unlink the data referenced by the - given URI: the unlink rights are used to tell any shareholders to - unlink the file (possibly deleting it), and the URI is removed from - the list that this system cares about, cancelling filechecking/repair - for the file. - - All 'unlink' steps are pushed to the end of the queue. - """ - - def add_delete_tempfile(filename): - """This step will delete a tempfile created by create_tempfile.""" - - def add_delete_box(boxname): - """When executed, this step deletes the given box.""" - - - # methods for use in unit tests - - def flush(): - """Execute all steps in the WorkQueue right away. Return a Deferred - that fires (with self) when the queue is empty. - """ - class NotCapableError(Exception): """You have tried to write to a read-only node.""" diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py deleted file mode 100644 index 4e28e9db..00000000 --- a/src/allmydata/workqueue.py +++ /dev/null @@ -1,490 +0,0 @@ - -import os, shutil -from zope.interface import implements -from twisted.internet import defer -from allmydata.util import bencode -from allmydata.util.idlib import b2a -from allmydata.Crypto.Cipher import AES -from allmydata.Crypto.Hash import SHA256 -from allmydata.filetree.nodemaker import NodeMaker -from allmydata.filetree.interfaces import INode -from allmydata.filetree.file import CHKFileNode -from allmydata.interfaces import IWorkQueue, NotCapableError, IUploader - - -class Step(object): - def setup(self, stepname, basedir): - self.basedir = basedir - self.stepname = stepname - self.stepbase = os.path.join(self.basedir, self.stepname) - - def remove(self, _ignored=None): - trashdir = os.path.join(self.basedir, "trash", self.stepname) - os.rename(self.stepbase, trashdir) - shutil.rmtree(trashdir) - -class UploadSSKStep(Step): - def start(self): - f = open(os.path.join(self.stepbase, "source_filename"), "r") - source_filename = f.read() - f.close() - f = open(os.path.join(self.stepbase, "write_capability"), "r") - write_cap = bencode.bdecode(f.read()) - f.close() - f = open(os.path.join(self.stepbase, "previous_version"), "r") - previous_version = bencode.bdecode(f.read()) - f.close() - - n = MutableSSKTracker() - n.set_version(previous_version) - n.set_write_capability(write_cap) - f = open(source_filename, "rb") - data = f.read() - f.close() - published_data = n.write_new_version(data) - d = self.push_ssk(n.ssk_index, n.vresion, published_data) - d.addCallback(self.remove) - return d - - -class WorkQueue(object): - implements(IWorkQueue) - debug = False - - def __init__(self, basedir): - assert basedir.endswith("workqueue") - self.basedir = basedir - self._node_maker = NodeMaker() - self._uploader = None # filled in later - self._downloader = None # filled in later - self.seqnum = 0 - self.tmpdir = os.path.join(basedir, "tmp") - #self.trashdir = os.path.join(basedir, "trash") - self.filesdir = os.path.join(basedir, "files") - self.boxesdir = os.path.join(basedir, "boxes") - if os.path.exists(self.tmpdir): - shutil.rmtree(self.tmpdir) - os.makedirs(self.tmpdir) - #if os.path.exists(self.trashdir): - # shutil.rmtree(self.trashdir) - #os.makedirs(self.trashdir) - if not os.path.exists(self.filesdir): - # filesdir is *not* cleared - os.makedirs(self.filesdir) - if not os.path.exists(self.boxesdir): - # likewise, boxesdir is not cleared - os.makedirs(self.boxesdir) - # all Steps are recorded in separate files in our basedir. All such - # files are named with the pattern 'step-END-NNN', where END is - # either 'first' or 'last'. These steps are to be executed in - # alphabetical order, with all 'step-first-NNN' steps running before - # any 'step-last-NNN'. - for n in os.listdir(self.basedir): - if n.startswith("step-first-"): - sn = int(n[len("step-first-"):]) - self.seqnum = max(self.seqnum, sn) - elif n.startswith("step-last-"): - sn = int(n[len("step-last-"):]) - self.seqnum = max(self.seqnum, sn) - # each of these files contains one string per line, and the first - # line specifies what kind of step it is - assert self.seqnum < 1000 # TODO: don't let this grow unboundedly - - def set_vdrive(self, vdrive): - self.vdrive = vdrive - def set_uploader(self, uploader): - assert IUploader(uploader) - self._uploader = uploader - - def create_tempfile(self, suffix=""): - randomname = b2a(os.urandom(10)) - filename = randomname + suffix - f = open(os.path.join(self.filesdir, filename), "wb") - return (f, filename) - - def create_boxname(self, contents=None): - boxname = b2a(os.urandom(10)) - if contents is not None: - self.write_to_box(boxname, contents) - return boxname - def write_to_box(self, boxname, contents): - assert INode(contents) - f = open(os.path.join(self.boxesdir, boxname), "w") - f.write(contents.serialize_node()) - f.flush() - os.fsync(f) - f.close() - def read_from_box(self, boxname): - f = open(os.path.join(self.boxesdir, boxname), "r") - data = f.read() - node = self._node_maker.make_node_from_serialized(data) - f.close() - return node - - def _create_step(self, end, lines): - assert end in ("first", "last") - filename = "step-%s-%d" % (end, self.seqnum) - self.seqnum += 1 - f = open(os.path.join(self.tmpdir, filename), "w") - for line in lines: - assert "\n" not in line, line - f.write(line) - f.write("\n") - f.flush() - os.fsync(f) - f.close() - fromfile = os.path.join(self.tmpdir, filename) - tofile = os.path.join(self.basedir, filename) - os.rename(fromfile, tofile) - - def _create_step_first(self, lines): - self._create_step("first", lines) - def _create_step_last(self, lines): - self._create_step("last", lines) - - # methods to add entries to the queue - def add_upload_chk(self, source_filename, stash_uri_in_boxname): - # If source_filename is absolute, it will point to something outside - # of our workqueue (this is how user files are uploaded). If it is - # relative, it points to something inside self.filesdir (this is how - # serialized directories and tempfiles are uploaded) - lines = ["upload_chk", source_filename, stash_uri_in_boxname] - self._create_step_first(lines) - - def add_upload_ssk(self, source_filename, write_capability, - previous_version): - lines = ["upload_ssk", source_filename, - b2a(write_capability.index), b2a(write_capability.key), - str(previous_version)] - self._create_step_first(lines) - - def add_retain_ssk(self, read_capability): - lines = ["retain_ssk", b2a(read_capability.index), - b2a(read_capability.key)] - self._create_step_first(lines) - - def add_unlink_ssk(self, write_capability): - lines = ["unlink_ssk", b2a(write_capability.index), - b2a(write_capability.key)] - self._create_step_last(lines) - - def add_retain_uri_from_box(self, boxname): - lines = ["retain_uri_from_box", boxname] - self._create_step_first(lines) - - def add_addpath(self, boxname, path): - assert isinstance(path, (list, tuple)) - lines = ["addpath", boxname] - lines.extend(path) - self._create_step_first(lines) - - def add_deletepath(self, path): - assert isinstance(path, (list, tuple)) - lines = ["deletepath"] - lines.extend(path) - self._create_step_first(lines) - - def add_modify_subtree(self, subtree_node, localpath, new_node_boxname, - new_subtree_boxname=None): - assert isinstance(localpath, (list, tuple)) - box1 = self.create_boxname(subtree_node) - self.add_delete_box(box1) - # TODO: it would probably be easier if steps were represented in - # directories, with a separate file for each argument - if new_node_boxname is None: - new_node_boxname = "" - if new_subtree_boxname is None: - new_subtree_boxname = "" - lines = ["modify_subtree", - box1, new_node_boxname, new_subtree_boxname] - lines.extend(localpath) - self._create_step_first(lines) - - def add_unlink_uri(self, uri): - lines = ["unlink_uri", uri] - self._create_step_last(lines) - - def add_delete_tempfile(self, filename): - lines = ["delete_tempfile", filename] - self._create_step_last(lines) - - def add_delete_box(self, boxname): - lines = ["delete_box", boxname] - self._create_step_last(lines) - - - # methods to perform work - - def run_next_step(self): - """Run the next pending step. - - Returns None if there is no next step to run, or a Deferred that - will fire when the step completes. The step will be removed - from the queue when it completes.""" - next_step = self.get_next_step() - if next_step: - stepname, steptype, lines = self.get_next_step() - d = self.dispatch_step(steptype, lines) - d.addCallback(self._delete_step, stepname) - return d - # no steps pending, it is safe to clean out leftover files - self._clean_leftover_files() - return None - - def _clean_leftover_files(self): - # there are no steps pending, therefore any leftover files in our - # filesdir are orphaned and can be deleted. This catches things like - # a tempfile being created but the application gets interrupted - # before the upload step which references it gets created, or if an - # upload step gets written but the remaining sequence (addpath, - # delete_box) does not. - for n in os.listdir(self.filesdir): - os.unlink(os.path.join(self.filesdir, n)) - for n in os.listdir(self.boxesdir): - os.unlink(os.path.join(self.boxesdir, n)) - - def get_next_step(self): - stepnames = [n for n in os.listdir(self.basedir) - if n.startswith("step-")] - stepnames.sort() - if not stepnames: - return None - stepname = stepnames[0] - return self._get_step(stepname) - - def _get_step(self, stepname): - f = open(os.path.join(self.basedir, stepname), "r") - lines = f.read().split("\n") - f.close() - assert lines[-1] == "" # files should end with a newline - lines.pop(-1) # remove the newline - steptype = lines.pop(0) - return stepname, steptype, lines - - def dispatch_step(self, steptype, lines): - handlername = "step_" + steptype - if not hasattr(self, handlername): - raise RuntimeError("unknown workqueue step type '%s'" % steptype) - handler = getattr(self, handlername) - d = defer.maybeDeferred(handler, *lines) - return d - - def _delete_step(self, res, stepname): - os.unlink(os.path.join(self.basedir, stepname)) - return res - - # debug/test methods - def count_pending_steps(self): - return len([n for n in os.listdir(self.basedir) - if n.startswith("step-")]) - def get_all_steps(self): - # returns a list of (steptype, lines) for all steps - stepnames = [] - for stepname in os.listdir(self.basedir): - if stepname.startswith("step-"): - stepnames.append(stepname) - stepnames.sort() - steps = [] - for stepname in stepnames: - steps.append(self._get_step(stepname)[1:]) - return steps - def run_all_steps(self, ignored=None): - d = self.run_next_step() - if d: - d.addCallback(self.run_all_steps) - return d - return defer.succeed(None) - def flush(self): - return self.run_all_steps() - - - def open_tempfile(self, filename): - f = open(os.path.join(self.filesdir, filename), "rb") - return f - - # work is dispatched to these methods. To add a new step type, add a - # dispatch method here and an add_ method above. - - - def step_upload_chk(self, source_filename, stash_uri_in_boxname): - if self.debug: - print "STEP_UPLOAD_CHK(%s -> %s)" % (source_filename, - stash_uri_in_boxname) - # we use relative filenames for tempfiles created by - # workqueue.create_tempfile, and absolute filenames for everything - # that comes from the vdrive. That means using os.path.abspath() on - # user files in VirtualDrive methods. - filename = os.path.join(self.filesdir, source_filename) - d = self._uploader.upload_filename(filename) - def _uploaded(uri): - if self.debug: - print " -> %s" % uri - node = CHKFileNode().new(uri) - self.write_to_box(stash_uri_in_boxname, node) - d.addCallback(_uploaded) - return d - - def step_upload_ssk(self, source_filename, index_a, write_key_a, prev_ver): - pass - - def step_addpath(self, boxname, *path): - if self.debug: - print "STEP_ADDPATH(%s -> %s)" % (boxname, "/".join(path)) - path = list(path) - return self.vdrive.addpath(path, boxname) - - def step_deletepath(self, *path): - if self.debug: - print "STEP_DELETEPATH(%s)" % "/".join(path) - path = list(path) - return self.vdrive.deletepath(path) - - def step_modify_subtree(self, subtree_node_boxname, new_node_boxname, - new_subtree_boxname, *localpath): - # the weird order of arguments is a consequence of the fact that - # localpath is variable-length and new_subtree_boxname is optional. - if not new_subtree_boxname: - new_subtree_boxname = None - subtree_node = self.read_from_box(subtree_node_boxname) - new_node = None - if new_node_boxname: - new_node = self.read_from_box(new_node_boxname) - localpath = list(localpath) - return self.vdrive.modify_subtree(subtree_node, localpath, - new_node, new_subtree_boxname) - - def step_retain_ssk(self, index_a, read_key_a): - pass - def step_unlink_ssk(self, index_a, write_key_a): - pass - def step_retain_uri_from_box(self, boxname): - pass - def step_unlink_uri(self, uri): - if self.debug: - print "STEP_UNLINK_URI(%s)" % uri - pass - - def step_delete_tempfile(self, filename): - if self.debug: - print "STEP_DELETE_TEMPFILE(%s)" % filename - assert not filename.startswith("/") - os.unlink(os.path.join(self.filesdir, filename)) - def step_delete_box(self, boxname): - if self.debug: - print "DELETE_BOX", boxname - os.unlink(os.path.join(self.boxesdir, boxname)) - - - - -AES_KEY_LENGTH = 16 -def make_aes_key(): - return os.urandom(16) -def make_rsa_key(): - raise NotImplementedError -def hash_sha(data): - return SHA256.new(data).digest() -def hash_sha_to_key(data): - return SHA256.new(data).digest()[:AES_KEY_LENGTH] -def aes_encrypt(key, plaintext): - assert isinstance(key, str) - assert len(key) == AES_KEY_LENGTH - cryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16) - crypttext = cryptor.encrypt(plaintext) - return crypttext -def aes_decrypt(key, crypttext): - assert isinstance(key, str) - assert len(key) == AES_KEY_LENGTH - cryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16) - plaintext = cryptor.decrypt(crypttext) - return plaintext -def serialize(objects): - return bencode.bencode(objects) -def unserialize(data): - return bencode.bdecode(data) - -class MutableSSKTracker(object): - """I represent a mutable file, indexed by an SSK. - """ - - def create(self): - # if you create the node this way, you will have both read and write - # capabilities - self.priv_key, self.pub_key = make_rsa_key() - self.ssk_index = hash_sha(self.pub_key.serialized()) - self.write_key = make_aes_key() - self.read_key = hash_sha_to_key(self.write_key) - self.version = 0 - - def set_version(self, version): - self.version = version - - def set_read_capability(self, read_cap): - (self.ssk_index, self.read_key) = read_cap - - def set_write_capability(self, write_cap): - # TODO: add some assertions here, if someone calls both - # set_read_capability and set_write_capability, make sure the keys - # match - (self.ssk_index, self.write_key) = write_cap - self.read_key = hash_sha_to_key(self.write_key) - - def extract_readwrite_from_published(self, published_data, write_key): - self.write_key = write_key - self.read_key = hash_sha_to_key(self.write_key) - self._extract(published_data) - self.priv_key = aes_decrypt(write_key, self.encrypted_privkey) - assert self.priv_key.is_this_your_pub_key(self.pub_key) - - def extract_readonly_from_published(self, published_data, read_key): - self.write_key = None - self.read_key = read_key - self._extract(published_data) - self.priv_key = None - - def _extract(self, published_data): - (signed_data, serialized_pub_key, sig) = unserialize(published_data) - self.pub_key = unserialize(serialized_pub_key) - self.pub_key.check_signature(sig, signed_data) - (encrypted_privkey, encrypted_data, version) = unserialize(signed_data) - self.data = aes_decrypt(self.read_key, encrypted_data) - self.encrypted_privkey = encrypted_privkey - - def get_read_capability(self): - return (self.ssk_index, self.read_key) - - def get_write_capability(self): - if not self.write_key: - raise NotCapableError("This MutableSSKTracker is read-only") - return (self.ssk_index, self.write_key) - - def write_new_version(self, data): - if not self.write_key: - raise NotCapableError("This MutableSSKTracker is read-only") - encrypted_privkey = aes_encrypt(self.write_key, - self.priv_key.serialized()) - encrypted_data = aes_encrypt(self.read_key, data) - self.version += 1 - signed_data = serialize((encrypted_privkey, - encrypted_data, - self.version)) - sig = self.priv_key.sign(signed_data) - serialized_pub_key = self.pub_key.serialized() - published_data = serialize((signed_data, serialized_pub_key, sig)) - return published_data - -def make_new_SSK_node(): - n = MutableSSKTracker() - n.create() - return n - -def extract_readwrite_SSK_node(published_data, write_key): - n = MutableSSKTracker() - n.extract_readwrite_SSK_node(published_data, write_key) - return n - -def extract_readonly_SSK_node(published_data, read_key): - n = MutableSSKTracker() - n.extract_readonly_from_published(published_data, read_key) - return n -