+import os.path
from zope.interface import implements
from twisted.internet import defer
from allmydata.filetree import directory, file, redirect
self.workqueue = workqueue
workqueue.set_vdrive(self)
workqueue.set_uploader(uploader)
+ self._downloader = downloader
# TODO: queen?
self.queen = None
self.root_node = root_node
return d
def download(self, path, target):
+ # TODO: does this mean download it right now? or schedule it in the
+ # workqueue for eventual download? should we add download steps to
+ # the workqueue?
assert isinstance(path, list)
d = self._get_file_uri(path)
- d.addCallback(lambda uri: self.downloader.download(uri, target))
+ d.addCallback(lambda uri: self._downloader.download(uri, target))
return d
+ def download_as_data(self, path):
+ # TODO: this is kind of goofy.. think of a better download API that
+ # is appropriate for this class
+ from allmydata import download
+ target = download.Data()
+ return self.download(path, target)
+
def upload_now(self, path, uploadable):
assert isinstance(path, list)
# note: the first few steps of this do not use the workqueue, but I
def upload_later(self, path, filename):
assert isinstance(path, list)
+ filename = os.path.abspath(filename)
boxname = self.workqueue.create_boxname()
self.workqueue.add_upload_chk(filename, boxname)
self.workqueue.add_addpath(boxname, path)
mode). This URI includes unlink rights. It does not mark the file for
retention.
+ Non-absolute filenames are interpreted relative to the workqueue's
+ special just-for-tempfiles directory.
+
When the upload is complete, the resulting URI is stashed in a 'box'
with the specified name. This is basically a local variable. A later
'add_subpath' step will reference this boxname and retrieve the URI.
pairs = list(directory.in_pairs(l))
self.failUnlessEqual(pairs, [(0,1), (2,3), (4,5), (6,7)])
-class StubDownloader(object):
- implements(IDownloader)
+class FakeMesh(object):
+ implements(IDownloader, IUploader)
-class StubUploader(object):
- implements(IUploader)
+ def __init__(self):
+ self.files = {}
+ def upload_filename(self, filename):
+ uri = "stub-uri-%d" % len(self.files)
+ data = open(filename,"r").read()
+ self.files[uri] = data
+ return defer.succeed(uri)
+ def download(self, uri, target):
+ target.open()
+ target.write(self.files[uri])
+ target.close()
+ return defer.maybeDeferred(target.finish)
-class Stuff(unittest.TestCase):
+
+class VDrive(unittest.TestCase):
def makeVirtualDrive(self, basedir, root_node=None):
wq = workqueue.WorkQueue(os.path.join(basedir, "1.workqueue"))
- dl = StubDownloader()
- ul = StubUploader()
+ dl = ul = FakeMesh()
if not root_node:
root_node = directory.LocalFileSubTreeNode()
root_node.new("rootdirtree.save")
v = vdrive.VirtualDrive(wq, dl, ul, root_node)
return v
+ def makeLocalTree(self, basename):
+ # create a LocalFileRedirection pointing at a LocalFileSubTree.
+ # Returns a VirtualDrive instance.
+ topdir = directory.LocalFileSubTree().new("%s-dirtree.save" % basename)
+ topdir.update_now(None)
+ root = redirect.LocalFileRedirection().new("%s-root" % basename,
+ topdir.create_node_now())
+ root.update_now(None)
+ v = self.makeVirtualDrive("%s-vdrive" % basename,
+ root.create_node_now())
+ return v
+
def failUnlessListsAreEqual(self, list1, list2):
self.failUnlessEqual(sorted(list1), sorted(list2))
self.failUnlessEqual(c1a, c2a)
def testDirectory(self):
- stm = vdrive.SubTreeMaker(None, StubDownloader())
+ stm = vdrive.SubTreeMaker(None, FakeMesh())
# create an empty directory (stored locally)
subtree = directory.LocalFileSubTree()
(which, expected_failure, res))
def testVdrive(self):
- topdir = directory.LocalFileSubTree().new("vdrive-dirtree.save")
- topdir.update_now(None)
- root = redirect.LocalFileRedirection().new("vdrive-root",
- topdir.create_node_now())
- root.update_now(None)
- v = self.makeVirtualDrive("vdrive", root.create_node_now())
+ v = self.makeLocalTree("vdrive")
d = v.list([])
def _listed(contents):
return d
+ def testUpload(self):
+ v = self.makeLocalTree("upload")
+ filename = "upload1"
+ DATA = "here is some data\n"
+ f = open(filename, "w")
+ f.write(DATA)
+ f.close()
+
+ rc = v.upload_later(["a","b","upload1"], filename)
+ self.failUnlessIdentical(rc, None)
+
+ d = v.workqueue.flush()
+
+ d.addCallback(lambda res: v.list([]))
+ d.addCallback(lambda contents:
+ self.failUnlessListsAreEqual(contents.keys(), ["a"]))
+ d.addCallback(lambda res: v.list(["a"]))
+ d.addCallback(lambda contents:
+ self.failUnlessListsAreEqual(contents.keys(), ["b"]))
+ d.addCallback(lambda res: v.list(["a","b"]))
+ d.addCallback(lambda contents:
+ self.failUnlessListsAreEqual(contents.keys(),
+ ["upload1"]))
+ d.addCallback(lambda res: v.download_as_data(["a","b","upload1"]))
+ d.addCallback(self.failUnlessEqual, DATA)
+
+ return d
+
from twisted.internet import defer
from allmydata import workqueue
from allmydata.util import idlib
+from allmydata.filetree.file import CHKFileNode
class FakeWorkQueue(workqueue.WorkQueue):
def testBox(self):
wq = self.wq("testBox")
boxname = wq.create_boxname()
- wq.write_to_box(boxname, "contents of box")
- self.failUnlessEqual(wq.read_from_box(boxname), "contents of box")
+ wq.write_to_box(boxname, CHKFileNode().new("uri goes here"))
+ out = wq.read_from_box(boxname)
+ self.failUnless(isinstance(out, CHKFileNode))
+ self.failUnlessEqual(out.get_uri(), "uri goes here")
def testCHK(self):
wq = self.wq("testCHK")
self.failUnless(os.path.exists(tmpfilename))
# likewise this unreferenced box should get deleted
boxname = wq.create_boxname()
- wq.write_to_box(boxname, "contents of box")
+ wq.write_to_box(boxname, CHKFileNode().new("uri here"))
boxfile = os.path.join(wq.boxesdir, boxname)
self.failUnless(os.path.exists(boxfile))
import os, shutil, sha
-from zope.interface import Interface, implements
+from zope.interface import implements
from twisted.internet import defer
from allmydata.util import bencode
from allmydata.util.idlib import b2a
from allmydata.Crypto.Cipher import AES
from allmydata.filetree.nodemaker import NodeMaker
from allmydata.filetree.interfaces import INode
+from allmydata.filetree.file import CHKFileNode
from allmydata.interfaces import IWorkQueue, NotCapableError, IUploader
def create_boxname(self, contents=None):
boxname = b2a(os.urandom(10))
if contents is not None:
- assert INode(contents)
- self.write_to_box(boxname, contents.serialize_node())
+ self.write_to_box(boxname, contents)
return boxname
- def write_to_box(self, boxname, data):
+ def write_to_box(self, boxname, contents):
+ assert INode(contents)
f = open(os.path.join(self.boxesdir, boxname), "w")
- f.write(data)
+ f.write(contents.serialize_node())
f.flush()
os.fsync(f)
f.close()
def read_from_box(self, boxname):
f = open(os.path.join(self.boxesdir, boxname), "r")
data = f.read()
+ node = self._node_maker.make_node_from_serialized(data)
f.close()
- return data
+ return node
def _create_step(self, end, lines):
assert end in ("first", "last")
# methods to add entries to the queue
def add_upload_chk(self, source_filename, stash_uri_in_boxname):
- # source_filename is absolute, and can point to things outside our
- # workqueue.
+ # If source_filename is absolute, it will point to something outside
+ # of our workqueue (this is how user files are uploaded). If it is
+ # relative, it points to something inside self.filesdir (this is how
+ # serialized directories and tempfiles are uploaded)
lines = ["upload_chk", source_filename, stash_uri_in_boxname]
self._create_step_first(lines)
# dispatch method here and an add_ method above.
- def step_upload_chk(self, source_filename, index_a, write_key_a):
- pass
+ def step_upload_chk(self, source_filename, stash_uri_in_boxname):
+ # we use relative filenames for tempfiles created by
+ # workqueue.create_tempfile, and absolute filenames for everything
+ # that comes from the vdrive. That means using os.path.abspath() on
+ # user files in VirtualDrive methods.
+ filename = os.path.join(self.filesdir, source_filename)
+ d = self._uploader.upload_filename(filename)
+ def _uploaded(uri):
+ node = CHKFileNode().new(uri)
+ self.write_to_box(stash_uri_in_boxname, node)
+ d.addCallback(_uploaded)
+ return d
+
def step_upload_ssk(self, source_filename, index_a, write_key_a, prev_ver):
pass
def step_addpath(self, boxname, *path):
path = list(path)
- data = self.read_from_box(boxname)
- child_node = self._node_maker.make_node_from_serialized(data)
+ child_node = self.read_from_box(boxname)
return self.vdrive.add(path, child_node)
def step_retain_ssk(self, index_a, read_key_a):