From 9dc1c0cfc048d1f726529dfc389cbce0be763324 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 21 Jan 2007 16:03:15 -0700 Subject: [PATCH] filetree: add vdrive upload/download test, change workqueue relative-filename semantics --- src/allmydata/filetree/vdrive.py | 15 ++++- src/allmydata/interfaces.py | 3 + src/allmydata/test/test_filetree_new.py | 73 ++++++++++++++++++++----- src/allmydata/test/test_workqueue.py | 9 ++- src/allmydata/workqueue.py | 38 +++++++++---- 5 files changed, 108 insertions(+), 30 deletions(-) diff --git a/src/allmydata/filetree/vdrive.py b/src/allmydata/filetree/vdrive.py index 8dc931bc..68b9fbfe 100644 --- a/src/allmydata/filetree/vdrive.py +++ b/src/allmydata/filetree/vdrive.py @@ -1,4 +1,5 @@ +import os.path from zope.interface import implements from twisted.internet import defer from allmydata.filetree import directory, file, redirect @@ -82,6 +83,7 @@ class VirtualDrive(object): self.workqueue = workqueue workqueue.set_vdrive(self) workqueue.set_uploader(uploader) + self._downloader = downloader # TODO: queen? self.queen = None self.root_node = root_node @@ -225,11 +227,21 @@ class VirtualDrive(object): return d def download(self, path, target): + # TODO: does this mean download it right now? or schedule it in the + # workqueue for eventual download? should we add download steps to + # the workqueue? assert isinstance(path, list) d = self._get_file_uri(path) - d.addCallback(lambda uri: self.downloader.download(uri, target)) + d.addCallback(lambda uri: self._downloader.download(uri, target)) return d + def download_as_data(self, path): + # TODO: this is kind of goofy.. think of a better download API that + # is appropriate for this class + from allmydata import download + target = download.Data() + return self.download(path, target) + def upload_now(self, path, uploadable): assert isinstance(path, list) # note: the first few steps of this do not use the workqueue, but I @@ -251,6 +263,7 @@ class VirtualDrive(object): def upload_later(self, path, filename): assert isinstance(path, list) + filename = os.path.abspath(filename) boxname = self.workqueue.create_boxname() self.workqueue.add_upload_chk(filename, boxname) self.workqueue.add_addpath(boxname, path) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 8be60721..04a6b3ab 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -276,6 +276,9 @@ class IWorkQueue(Interface): mode). This URI includes unlink rights. It does not mark the file for retention. + Non-absolute filenames are interpreted relative to the workqueue's + special just-for-tempfiles directory. + When the upload is complete, the resulting URI is stashed in a 'box' with the specified name. This is basically a local variable. A later 'add_subpath' step will reference this boxname and retrieve the URI. diff --git a/src/allmydata/test/test_filetree_new.py b/src/allmydata/test/test_filetree_new.py index 7393bf75..1edbd2b7 100644 --- a/src/allmydata/test/test_filetree_new.py +++ b/src/allmydata/test/test_filetree_new.py @@ -332,24 +332,46 @@ class InPairs(unittest.TestCase): pairs = list(directory.in_pairs(l)) self.failUnlessEqual(pairs, [(0,1), (2,3), (4,5), (6,7)]) -class StubDownloader(object): - implements(IDownloader) +class FakeMesh(object): + implements(IDownloader, IUploader) -class StubUploader(object): - implements(IUploader) + def __init__(self): + self.files = {} + def upload_filename(self, filename): + uri = "stub-uri-%d" % len(self.files) + data = open(filename,"r").read() + self.files[uri] = data + return defer.succeed(uri) + def download(self, uri, target): + target.open() + target.write(self.files[uri]) + target.close() + return defer.maybeDeferred(target.finish) -class Stuff(unittest.TestCase): + +class VDrive(unittest.TestCase): def makeVirtualDrive(self, basedir, root_node=None): wq = workqueue.WorkQueue(os.path.join(basedir, "1.workqueue")) - dl = StubDownloader() - ul = StubUploader() + dl = ul = FakeMesh() if not root_node: root_node = directory.LocalFileSubTreeNode() root_node.new("rootdirtree.save") v = vdrive.VirtualDrive(wq, dl, ul, root_node) return v + def makeLocalTree(self, basename): + # create a LocalFileRedirection pointing at a LocalFileSubTree. + # Returns a VirtualDrive instance. + topdir = directory.LocalFileSubTree().new("%s-dirtree.save" % basename) + topdir.update_now(None) + root = redirect.LocalFileRedirection().new("%s-root" % basename, + topdir.create_node_now()) + root.update_now(None) + v = self.makeVirtualDrive("%s-vdrive" % basename, + root.create_node_now()) + return v + def failUnlessListsAreEqual(self, list1, list2): self.failUnlessEqual(sorted(list1), sorted(list2)) @@ -359,7 +381,7 @@ class Stuff(unittest.TestCase): self.failUnlessEqual(c1a, c2a) def testDirectory(self): - stm = vdrive.SubTreeMaker(None, StubDownloader()) + stm = vdrive.SubTreeMaker(None, FakeMesh()) # create an empty directory (stored locally) subtree = directory.LocalFileSubTree() @@ -453,12 +475,7 @@ class Stuff(unittest.TestCase): (which, expected_failure, res)) def testVdrive(self): - topdir = directory.LocalFileSubTree().new("vdrive-dirtree.save") - topdir.update_now(None) - root = redirect.LocalFileRedirection().new("vdrive-root", - topdir.create_node_now()) - root.update_now(None) - v = self.makeVirtualDrive("vdrive", root.create_node_now()) + v = self.makeLocalTree("vdrive") d = v.list([]) def _listed(contents): @@ -500,3 +517,31 @@ class Stuff(unittest.TestCase): return d + def testUpload(self): + v = self.makeLocalTree("upload") + filename = "upload1" + DATA = "here is some data\n" + f = open(filename, "w") + f.write(DATA) + f.close() + + rc = v.upload_later(["a","b","upload1"], filename) + self.failUnlessIdentical(rc, None) + + d = v.workqueue.flush() + + d.addCallback(lambda res: v.list([])) + d.addCallback(lambda contents: + self.failUnlessListsAreEqual(contents.keys(), ["a"])) + d.addCallback(lambda res: v.list(["a"])) + d.addCallback(lambda contents: + self.failUnlessListsAreEqual(contents.keys(), ["b"])) + d.addCallback(lambda res: v.list(["a","b"])) + d.addCallback(lambda contents: + self.failUnlessListsAreEqual(contents.keys(), + ["upload1"])) + d.addCallback(lambda res: v.download_as_data(["a","b","upload1"])) + d.addCallback(self.failUnlessEqual, DATA) + + return d + diff --git a/src/allmydata/test/test_workqueue.py b/src/allmydata/test/test_workqueue.py index d0c01c51..872ee636 100644 --- a/src/allmydata/test/test_workqueue.py +++ b/src/allmydata/test/test_workqueue.py @@ -4,6 +4,7 @@ from twisted.trial import unittest from twisted.internet import defer from allmydata import workqueue from allmydata.util import idlib +from allmydata.filetree.file import CHKFileNode class FakeWorkQueue(workqueue.WorkQueue): @@ -65,8 +66,10 @@ class Items(unittest.TestCase): def testBox(self): wq = self.wq("testBox") boxname = wq.create_boxname() - wq.write_to_box(boxname, "contents of box") - self.failUnlessEqual(wq.read_from_box(boxname), "contents of box") + wq.write_to_box(boxname, CHKFileNode().new("uri goes here")) + out = wq.read_from_box(boxname) + self.failUnless(isinstance(out, CHKFileNode)) + self.failUnlessEqual(out.get_uri(), "uri goes here") def testCHK(self): wq = self.wq("testCHK") @@ -149,7 +152,7 @@ class Items(unittest.TestCase): self.failUnless(os.path.exists(tmpfilename)) # likewise this unreferenced box should get deleted boxname = wq.create_boxname() - wq.write_to_box(boxname, "contents of box") + wq.write_to_box(boxname, CHKFileNode().new("uri here")) boxfile = os.path.join(wq.boxesdir, boxname) self.failUnless(os.path.exists(boxfile)) diff --git a/src/allmydata/workqueue.py b/src/allmydata/workqueue.py index 4e469b17..eeff4b8d 100644 --- a/src/allmydata/workqueue.py +++ b/src/allmydata/workqueue.py @@ -1,12 +1,13 @@ import os, shutil, sha -from zope.interface import Interface, implements +from zope.interface import implements from twisted.internet import defer from allmydata.util import bencode from allmydata.util.idlib import b2a from allmydata.Crypto.Cipher import AES from allmydata.filetree.nodemaker import NodeMaker from allmydata.filetree.interfaces import INode +from allmydata.filetree.file import CHKFileNode from allmydata.interfaces import IWorkQueue, NotCapableError, IUploader @@ -101,20 +102,21 @@ class WorkQueue(object): def create_boxname(self, contents=None): boxname = b2a(os.urandom(10)) if contents is not None: - assert INode(contents) - self.write_to_box(boxname, contents.serialize_node()) + self.write_to_box(boxname, contents) return boxname - def write_to_box(self, boxname, data): + def write_to_box(self, boxname, contents): + assert INode(contents) f = open(os.path.join(self.boxesdir, boxname), "w") - f.write(data) + f.write(contents.serialize_node()) f.flush() os.fsync(f) f.close() def read_from_box(self, boxname): f = open(os.path.join(self.boxesdir, boxname), "r") data = f.read() + node = self._node_maker.make_node_from_serialized(data) f.close() - return data + return node def _create_step(self, end, lines): assert end in ("first", "last") @@ -139,8 +141,10 @@ class WorkQueue(object): # methods to add entries to the queue def add_upload_chk(self, source_filename, stash_uri_in_boxname): - # source_filename is absolute, and can point to things outside our - # workqueue. + # If source_filename is absolute, it will point to something outside + # of our workqueue (this is how user files are uploaded). If it is + # relative, it points to something inside self.filesdir (this is how + # serialized directories and tempfiles are uploaded) lines = ["upload_chk", source_filename, stash_uri_in_boxname] self._create_step_first(lines) @@ -277,15 +281,25 @@ class WorkQueue(object): # dispatch method here and an add_ method above. - def step_upload_chk(self, source_filename, index_a, write_key_a): - pass + def step_upload_chk(self, source_filename, stash_uri_in_boxname): + # we use relative filenames for tempfiles created by + # workqueue.create_tempfile, and absolute filenames for everything + # that comes from the vdrive. That means using os.path.abspath() on + # user files in VirtualDrive methods. + filename = os.path.join(self.filesdir, source_filename) + d = self._uploader.upload_filename(filename) + def _uploaded(uri): + node = CHKFileNode().new(uri) + self.write_to_box(stash_uri_in_boxname, node) + d.addCallback(_uploaded) + return d + def step_upload_ssk(self, source_filename, index_a, write_key_a, prev_ver): pass def step_addpath(self, boxname, *path): path = list(path) - data = self.read_from_box(boxname) - child_node = self._node_maker.make_node_from_serialized(data) + child_node = self.read_from_box(boxname) return self.vdrive.add(path, child_node) def step_retain_ssk(self, index_a, read_key_a): -- 2.45.2