]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
add download code to vdrive, add system-level test for vdrive functionality, refactor...
authorBrian Warner <warner@lothar.com>
Mon, 4 Dec 2006 05:42:19 +0000 (22:42 -0700)
committerBrian Warner <warner@lothar.com>
Mon, 4 Dec 2006 05:42:19 +0000 (22:42 -0700)
allmydata/download.py
allmydata/test/test_system.py
allmydata/test/test_vdrive.py
allmydata/vdrive.py

index ecdaae5d7bc3442489c70b2b35975af34d3d97a4..58b8b4b2896a8b1a898deaec5c9ac8122b013451 100644 (file)
@@ -1,4 +1,6 @@
 
+import os
+from zope.interface import Interface, implements
 from twisted.python import failure, log
 from twisted.internet import defer
 from twisted.application import service
@@ -6,8 +8,6 @@ from twisted.application import service
 from allmydata.util import idlib
 from allmydata import encode
 
-from cStringIO import StringIO
-
 class NotEnoughPeersError(Exception):
     pass
 
@@ -23,13 +23,18 @@ class FileDownloader:
         assert isinstance(verifierid, str)
         self._verifierid = verifierid
 
-    def set_filehandle(self, filehandle):
-        self._filehandle = filehandle
+    def set_download_target(self, target):
+        self._target = target
+        self._target.register_canceller(self._cancel)
+
+    def _cancel(self):
+        pass
 
     def make_decoder(self):
         n = self._shares = 4
         k = self._desired_shares = 2
-        self._decoder = encode.Decoder(self._filehandle, k, n,
+        self._target.open()
+        self._decoder = encode.Decoder(self._target, k, n,
                                        self._verifierid)
 
     def start(self):
@@ -103,43 +108,118 @@ class FileDownloader:
         for peerid, buckets in self.landlords:
             all_buckets.extend(buckets)
         d = self._decoder.start(all_buckets)
+        def _done(res):
+            self._target.close()
+            return self._target.finish()
+        def _fail(res):
+            self._target.fail()
+            return res
+        d.addCallbacks(_done, _fail)
         return d
 
 def netstring(s):
     return "%d:%s," % (len(s), s)
 
+class IDownloadTarget(Interface):
+    def open():
+        """Called before any calls to write() or close()."""
+    def write(data):
+        pass
+    def close():
+        pass
+    def fail():
+        """fail() is called to indicate that the download has failed. No
+        further methods will be invoked on the IDownloadTarget after fail()."""
+    def register_canceller(cb):
+        """The FileDownloader uses this to register a no-argument function
+        that the target can call to cancel the download. Once this canceller
+        is invoked, no further calls to write() or close() will be made."""
+    def finish(self):
+        """When the FileDownloader is done, this finish() function will be
+        called. Whatever it returns will be returned to the invoker of
+        Downloader.download.
+        """
+
+class FileName:
+    implements(IDownloadTarget)
+    def __init__(self, filename):
+        self._filename = filename
+    def open(self):
+        self.f = open(self._filename, "wb")
+        return self.f
+    def write(self, data):
+        self.f.write(data)
+    def close(self):
+        self.f.close()
+    def fail(self):
+        self.f.close()
+        os.unlink(self._filename)
+    def register_canceller(self, cb):
+        pass # we won't use it
+    def finish(self):
+        pass
+
+class Data:
+    implements(IDownloadTarget)
+    def __init__(self):
+        self._data = []
+    def open(self):
+        pass
+    def write(self, data):
+        self._data.append(data)
+    def close(self):
+        self.data = "".join(self._data)
+        del self._data
+    def fail(self):
+        del self._data
+    def register_canceller(self, cb):
+        pass # we won't use it
+    def finish(self):
+        return self.data
+
+class FileHandle:
+    implements(IDownloadTarget)
+    def __init__(self, filehandle):
+        self._filehandle = filehandle
+    def open(self):
+        pass
+    def write(self, data):
+        self._filehandle.write(data)
+    def close(self):
+        # the originator of the filehandle reserves the right to close it
+        pass
+    def fail(self):
+        pass
+    def register_canceller(self, cb):
+        pass
+    def finish(self):
+        pass
+
+
 class Downloader(service.MultiService):
     """I am a service that allows file downloading.
     """
     name = "downloader"
 
-    def download_to_filename(self, verifierid, filename):
-        f = open(filename, "wb")
-        def _done(res):
-            f.close()
-            return res
-        d = self.download_filehandle(verifierid, f)
-        d.addBoth(_done)
-        return d
-
-    def download_to_data(self, verifierid):
-        f = StringIO()
-        d = self.download_filehandle(verifierid, f)
-        def _done(res):
-            return f.getvalue()
-        d.addCallback(_done)
-        return d
-
-    def download_filehandle(self, verifierid, f):
+    def download(self, verifierid, t):
         assert self.parent
         assert self.running
         assert isinstance(verifierid, str)
-        assert f.write
-        assert f.close
+        t = IDownloadTarget(t)
+        assert t.write
+        assert t.close
         dl = FileDownloader(self.parent, verifierid)
-        dl.set_filehandle(f)
+        dl.set_download_target(t)
         dl.make_decoder()
         d = dl.start()
         return d
 
+    # utility functions
+    def download_to_data(self, verifierid):
+        return self.download(verifierid, Data())
+    def download_to_filename(self, verifierid, filename):
+        return self.download(verifierid, FileName(filename))
+    def download_to_filehandle(self, verifierid, filehandle):
+        return self.download(verifierid, FileHandle(filehandle))
+
 
index 5f934fe6ac00f0107de7bfeb5854640fb0e57b79..80205a6d6d34e261320f0fcfcb3f99bd1974d2aa 100644 (file)
@@ -83,3 +83,27 @@ class SystemTest(unittest.TestCase):
         return d
     test_upload_and_download.timeout = 20
 
+    def test_vdrive(self):
+        DATA = "Some data to publish to the virtual drive\n"
+        d = self.set_up_nodes()
+        def _do_publish(res):
+            log.msg("PUBLISHING")
+            v0 = self.clients[0].getServiceNamed("vdrive")
+            d1 = v0.make_directory("/", "subdir1")
+            d1.addCallback(lambda subdir1:
+                           v0.put_file_by_data(subdir1, "data", DATA))
+            return d1
+        d.addCallback(_do_publish)
+        def _publish_done(res):
+            log.msg("publish finished")
+            v1 = self.clients[1].getServiceNamed("vdrive")
+            d1 = v1.get_file_to_data("/subdir1/data")
+            return d1
+        d.addCallback(_publish_done)
+        def _get_done(data):
+            log.msg("get finished")
+            self.failUnlessEqual(data, DATA)
+        d.addCallback(_get_done)
+        return d
+    test_vdrive.timeout = 20
+
index 20ae2102299abde93ca136b1b5fc391af02fbd8d..3610a66b88a4058891658d1109a14a3cd9bc90f2 100644 (file)
@@ -66,4 +66,3 @@ class Traverse(unittest.TestCase):
                       self.failUnlessEqual(sorted(files),
                                            ["2.a", "2.b", "d2.1"]))
         return d
-
index 06670a83e0598bbe6e98c459d8a18d3b72a6de97..d61251bdae7ad6efadafdde0b6b0a4b3775248b1 100644 (file)
@@ -3,7 +3,7 @@
 
 from twisted.application import service
 from twisted.internet import defer
-from allmydata.upload import Data, FileHandle, FileName
+from allmydata import upload, download
 
 class VDrive(service.MultiService):
     name = "vdrive"
@@ -40,6 +40,20 @@ class VDrive(service.MultiService):
         d.addCallback(_check)
         return d
 
+    def get_verifierid_from_parent(self, parent, filename):
+        assert not isinstance(parent, str), "'%s' isn't a directory node" % (parent,)
+        d = parent.callRemote("list")
+        def _find(table):
+            for name,target in table:
+                if name == filename:
+                    assert isinstance(target, str), "Hey, %s isn't a file" % filename
+                    return target
+            else:
+                raise KeyError("no such file '%s' in '%s'" %
+                               (filename, [t[0] for t in table]))
+        d.addCallback(_find)
+        return d
+
     def get_root(self):
         return self.gvd_root
 
@@ -64,10 +78,10 @@ class VDrive(service.MultiService):
         I return a deferred that will fire when the operation is complete.
         """
 
-        u = self.parent.getServiceNamed("uploader")
+        ul = self.parent.getServiceNamed("uploader")
         d = self.dirpath(dir_or_path)
         def _got_dir(dirnode):
-            d1 = u.upload(uploadable)
+            d1 = ul.upload(uploadable)
             d1.addCallback(lambda vid:
                            dirnode.callRemote("add_file", name, vid))
             return d1
@@ -75,14 +89,60 @@ class VDrive(service.MultiService):
         return d
 
     def put_file_by_filename(self, dir_or_path, name, filename):
-        return self.put_file(dir_or_path, name, FileName(filename))
+        return self.put_file(dir_or_path, name, upload.FileName(filename))
     def put_file_by_data(self, dir_or_path, name, data):
-        return self.put_file(dir_or_path, name, Data(data))
+        return self.put_file(dir_or_path, name, upload.Data(data))
     def put_file_by_filehandle(self, dir_or_path, name, filehandle):
-        return self.put_file(dir_or_path, name, FileHandle(filehandle))
+        return self.put_file(dir_or_path, name, upload.FileHandle(filehandle))
 
     def make_directory(self, dir_or_path, name):
         d = self.dirpath(dir_or_path)
         d.addCallback(lambda parent: parent.callRemote("add_directory", name))
         return d
 
+
+    def get_file(self, dir_and_name_or_path, download_target):
+        """Retrieve a file from the virtual drive and put it somewhere.
+
+        The file to be retrieved may either be specified as a (dir, name)
+        tuple or as a full /-delimited pathname. In the former case, 'dir'
+        can be either a DirectoryNode or a pathname.
+
+        The download target must be an IDownloadTarget instance like
+        allmydata.download.Data, .FileName, or .FileHandle .
+        """
+
+        dl = self.parent.getServiceNamed("downloader")
+
+        if isinstance(dir_and_name_or_path, tuple):
+            dir_or_path, name = dir_and_name_or_path
+            d = self.dirpath(dir_or_path)
+            def _got_dir(dirnode):
+                return self.get_verifierid_from_parent(dirnode, name)
+            d.addCallback(_got_dir)
+        else:
+            rslash = dir_and_name_or_path.rfind("/")
+            if rslash == -1:
+                # we're looking for a file in the root directory
+                dir = self.gvd_root
+                name = dir_and_name_or_path
+                d = self.get_verifierid_from_parent(dir, name)
+            else:
+                dirpath = dir_and_name_or_path[:rslash]
+                name = dir_and_name_or_path[rslash+1:]
+                d = self.dirpath(dirpath)
+                d.addCallback(lambda dir:
+                              self.get_verifierid_from_parent(dir, name))
+
+        def _got_verifierid(verifierid):
+            return dl.download(verifierid, download_target)
+        d.addCallback(_got_verifierid)
+        return d
+
+    def get_file_to_filename(self, from_where, filename):
+        return self.get_file(from_where, download.FileName(filename))
+    def get_file_to_data(self, from_where):
+        return self.get_file(from_where, download.Data())
+    def get_file_to_filehandle(self, from_where, filehandle):
+        return self.get_file(from_where, download.FileHandle(filehandle))
+