]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
#527: support HTTP 'Range:' requests, using a cachefile. Adds filenode.read(consumer...
authorBrian Warner <warner@lothar.com>
Tue, 28 Oct 2008 20:41:04 +0000 (13:41 -0700)
committerBrian Warner <warner@lothar.com>
Tue, 28 Oct 2008 20:41:04 +0000 (13:41 -0700)
src/allmydata/client.py
src/allmydata/immutable/download.py
src/allmydata/immutable/filenode.py
src/allmydata/interfaces.py
src/allmydata/test/common.py
src/allmydata/test/test_filenode.py
src/allmydata/test/test_system.py
src/allmydata/test/test_web.py
src/allmydata/web/common.py
src/allmydata/web/filenode.py

index 0f323b82b26c3a2f9df7a2481a8fea856fd40318..797d1745bff29bea0e0249a8872990c4037ad2c6 100644 (file)
@@ -18,7 +18,7 @@ from allmydata.immutable.filenode import FileNode, LiteralFileNode
 from allmydata.offloaded import Helper
 from allmydata.control import ControlServer
 from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, testutil
+from allmydata.util import hashutil, base32, testutil, fileutil
 from allmydata.uri import LiteralFileURI
 from allmydata.dirnode import NewDirectoryNode
 from allmydata.mutable.node import MutableFileNode, MutableWatcher
@@ -188,6 +188,9 @@ class Client(node.Node, testutil.PollMixin):
         self.convergence = base32.a2b(convergence_s)
         self._node_cache = weakref.WeakValueDictionary() # uri -> node
         self.add_service(Uploader(helper_furl, self.stats_provider))
+        self.download_cachedir = os.path.join(self.basedir,
+                                              "private", "cache", "download")
+        fileutil.make_dirs(self.download_cachedir)
         self.add_service(Downloader(self.stats_provider))
         self.add_service(MutableWatcher(self.stats_provider))
         def _publish(res):
@@ -334,7 +337,10 @@ class Client(node.Node, testutil.PollMixin):
                 if isinstance(u, LiteralFileURI):
                     node = LiteralFileNode(u, self) # LIT
                 else:
-                    node = FileNode(u, self) # CHK
+                    cachefile = os.path.join(self.download_cachedir,
+                                             base32.b2a(u.storage_index))
+                    # TODO: cachefile manager, weakref, expire policy
+                    node = FileNode(u, self, cachefile) # CHK
             else:
                 assert IMutableFileURI.providedBy(u), u
                 node = MutableFileNode(self).init_from_uri(u)
index 80cd2460d176959509ee9b139c212ca0d0648392..0a62833c808ff8294d8d1487d2febd5c00ae60c7 100644 (file)
@@ -1067,7 +1067,7 @@ class ConsumerAdapter:
     def register_canceller(self, cb):
         pass
     def finish(self):
-        return None
+        return self._consumer
 
 
 class Downloader(service.MultiService):
index bf06cb510d51caecde6b6ab577ab4f4d1d909d4d..a9a4aa4e6ab5a6357336598b43230e6e17dd38e0 100644 (file)
@@ -1,15 +1,18 @@
 
+import os.path, stat
+from cStringIO import StringIO
 from zope.interface import implements
 from twisted.internet import defer
 from twisted.internet.interfaces import IPushProducer, IConsumer
+from twisted.protocols import basic
 from allmydata.interfaces import IFileNode, IFileURI, ICheckable
+from allmydata.util import observer, log, base32
 from allmydata.immutable.checker import SimpleCHKFileChecker, \
      SimpleCHKFileVerifier
+from allmydata.immutable import download
 
-class ImmutableFileNode(object):
+class _ImmutableFileNodeBase(object):
     implements(IFileNode, ICheckable)
-    checker_class = SimpleCHKFileChecker
-    verifier_class = SimpleCHKFileVerifier
 
     def __init__(self, uri, client):
         self.u = IFileURI(uri)
@@ -37,11 +40,41 @@ class ImmutableFileNode(object):
         else:
             return True
 
-class FileNode(ImmutableFileNode):
+class PortionOfFile:
+    # like a list slice (things[2:14]), but for a file on disk
+    def __init__(self, fn, offset=0, size=None):
+        self.f = open(fn, "rb")
+        self.f.seek(offset)
+        self.bytes_left = size
+
+    def read(self, size=None):
+        # bytes_to_read = min(size, self.bytes_left), but None>anything
+        if size is None:
+            bytes_to_read = self.bytes_left
+        elif self.bytes_left is None:
+            bytes_to_read = size
+        else:
+            bytes_to_read = min(size, self.bytes_left)
+        data = self.f.read(bytes_to_read)
+        if self.bytes_left is not None:
+            self.bytes_left -= len(data)
+        return data
+
+class FileNode(_ImmutableFileNodeBase):
     checker_class = SimpleCHKFileChecker
+    verifier_class = SimpleCHKFileVerifier
 
-    def __init__(self, uri, client):
-        ImmutableFileNode.__init__(self, uri, client)
+    def __init__(self, uri, client, cachefile):
+        _ImmutableFileNodeBase.__init__(self, uri, client)
+        self.cachefile = cachefile
+        # five states:
+        #  new FileNode, no downloads ever performed
+        #  new FileNode, leftover file (partial)
+        #  new FileNode, leftover file (whole)
+        #  download in progress, not yet complete
+        #  download complete
+        self.download_in_progress = False
+        self.fully_cached_observer = observer.OneShotObserverList()
 
     def get_uri(self):
         return self.u.to_string()
@@ -84,6 +117,67 @@ class FileNode(ImmutableFileNode):
         d.addCallback(_done)
         return d
 
+    def read(self, consumer, offset=0, size=None):
+        if size is None:
+            size = self.get_size() - offset
+
+        assert self.cachefile
+
+        try:
+            filesize = os.stat(self.cachefile)[stat.ST_SIZE]
+        except OSError:
+            filesize = 0
+        if filesize >= offset+size:
+            log.msg(format=("immutable filenode read [%(si)s]: " +
+                            "satisfied from cache " +
+                            "(read %(start)d+%(size)d, filesize %(filesize)d)"),
+                    si=base32.b2a(self.u.storage_index),
+                    start=offset, size=size, filesize=filesize,
+                    umid="5p5ECA", level=log.OPERATIONAL)
+            f = PortionOfFile(self.cachefile, offset, size)
+            d = basic.FileSender().beginFileTransfer(f, consumer)
+            d.addCallback(lambda lastSent: consumer)
+            return d
+
+        if offset == 0 and size == self.get_size():
+            # don't use the cache, just do a normal streaming download
+            log.msg(format=("immutable filenode read [%(si)s]: " +
+                            "doing normal full download"),
+                    si=base32.b2a(self.u.storage_index),
+                    umid="VRSBwg", level=log.OPERATIONAL)
+            return self.download(download.ConsumerAdapter(consumer))
+
+        if not self.download_in_progress:
+            log.msg(format=("immutable filenode read [%(si)s]: " +
+                            "starting download"),
+                    si=base32.b2a(self.u.storage_index),
+                    umid="h26Heg", level=log.OPERATIONAL)
+            self.start_download_to_cache()
+
+        # The file is being downloaded, but the portion we want isn't yet
+        # available, so we have to wait. First cut: wait for the whole thing
+        # to download. The second cut will be to wait for a specific range
+        # milestone, with a download target that counts bytes and compares
+        # them against a milestone list.
+        log.msg(format=("immutable filenode read [%(si)s]: " +
+                        "waiting for download"),
+                si=base32.b2a(self.u.storage_index),
+                umid="l48V7Q", level=log.OPERATIONAL)
+        d = self.when_fully_cached()
+        d.addCallback(lambda ignored: self.read(consumer, offset, size))
+        return d
+
+    def start_download_to_cache(self):
+        assert not self.download_in_progress
+        self.download_in_progress = True
+        downloader = self._client.getServiceNamed("downloader")
+        d = downloader.download_to_filename(self.get_uri(), self.cachefile)
+        d.addBoth(self.fully_cached_observer.fire)
+
+    def when_fully_cached(self):
+        return self.fully_cached_observer.when_fired()
+
+
     def download(self, target):
         downloader = self._client.getServiceNamed("downloader")
         return downloader.download(self.get_uri(), target)
@@ -99,10 +193,11 @@ class LiteralProducer:
     def stopProducing(self):
         pass
 
-class LiteralFileNode(ImmutableFileNode):
+
+class LiteralFileNode(_ImmutableFileNodeBase):
 
     def __init__(self, uri, client):
-        ImmutableFileNode.__init__(self, uri, client)
+        _ImmutableFileNodeBase.__init__(self, uri, client)
 
     def get_uri(self):
         return self.u.to_string()
@@ -122,6 +217,25 @@ class LiteralFileNode(ImmutableFileNode):
     def check_and_repair(self, monitor, verify=False):
         return defer.succeed(None)
 
+    def read(self, consumer, offset=0, size=None):
+        if size is None:
+            data = self.u.data[offset:]
+        else:
+            data = self.u.data[offset:offset+size]
+
+        # We use twisted.protocols.basic.FileSender, which only does
+        # non-streaming, i.e. PullProducer, where the receiver/consumer must
+        # ask explicitly for each chunk of data. There are only two places in
+        # the Twisted codebase that can't handle streaming=False, both of
+        # which are in the upload path for an FTP/SFTP server
+        # (protocols.ftp.FileConsumer and
+        # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
+        # likely to be used as the target for a Tahoe download.
+
+        d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
+        d.addCallback(lambda lastSent: consumer)
+        return d
+
     def download(self, target):
         # note that this does not update the stats_provider
         data = self.u.data
index 7acc163d54fad2b57682fbe2248c8aebeeebdb2c..c624bdfa8691e91c1fe99d28799ba42b9dc91ea2 100644 (file)
@@ -480,6 +480,58 @@ class IFileNode(IFilesystemNode):
     def get_size():
         """Return the length (in bytes) of the data this node represents."""
 
+    def read(consumer, offset=0, size=None):
+        """Download a portion (possibly all) of the file's contents, making
+        them available to the given IConsumer. Return a Deferred that fires
+        (with the consumer) when the consumer is unregistered (either because
+        the last byte has been given to it, or because the consumer threw an
+        exception during write(), possibly because it no longer wants to
+        receive data). The portion downloaded will start at 'offset' and
+        contain 'size' bytes (or the remainder of the file if size==None).
+
+        The consumer will be used in non-streaming mode: an IPullProducer
+        will be attached to it.
+
+        The consumer will not receive data right away: several network trips
+        must occur first. The order of events will be::
+
+         consumer.registerProducer(p, streaming)
+          (if streaming == False)::
+           consumer does p.resumeProducing()
+            consumer.write(data)
+           consumer does p.resumeProducing()
+            consumer.write(data).. (repeat until all data is written)
+         consumer.unregisterProducer()
+         deferred.callback(consumer)
+
+        If a download error occurs, or an exception is raised by
+        consumer.registerProducer() or consumer.write(), I will call
+        consumer.unregisterProducer() and then deliver the exception via
+        deferred.errback(). To cancel the download, the consumer should call
+        p.stopProducing(), which will result in an exception being delivered
+        via deferred.errback().
+
+        A simple download-to-memory consumer example would look like this::
+
+         class MemoryConsumer:
+           implements(IConsumer)
+           def __init__(self):
+             self.chunks = []
+             self.done = False
+           def registerProducer(self, p, streaming):
+             assert streaming == False
+             while not self.done:
+               p.resumeProducing()
+           def write(self, data):
+             self.chunks.append(data)
+           def unregisterProducer(self):
+             self.done = True
+         d = filenode.read(MemoryConsumer())
+         d.addCallback(lambda mc: "".join(mc.chunks))
+         return d
+
+        """
+
 class IMutableFileNode(IFileNode, IMutableFilesystemNode):
     """I provide access to a 'mutable file', which retains its identity
     regardless of what contents are put in it.
index 4e08af29d083594a06094c1b701932bb67e98db1..1898acc615dfb9da9438854e183c6e908a3d6526 100644 (file)
@@ -2,6 +2,7 @@
 import os
 from zope.interface import implements
 from twisted.internet import defer
+from twisted.internet.interfaces import IConsumer
 from twisted.python import failure
 from twisted.application import service
 from twisted.web.error import Error as WebError
@@ -101,8 +102,23 @@ class FakeCHKFileNode:
         data = self.all_contents[self.my_uri]
         return defer.succeed(data)
     def get_size(self):
-        data = self.all_contents[self.my_uri]
+        try:
+            data = self.all_contents[self.my_uri]
+        except KeyError:
+            raise NotEnoughSharesError()
         return len(data)
+    def read(self, consumer, offset=0, size=None):
+        d = self.download_to_data()
+        def _got(data):
+            start = offset
+            if size is not None:
+                end = offset + size
+            else:
+                end = len(data)
+            consumer.write(data[start:end])
+            return consumer
+        d.addCallback(_got)
+        return d
 
 def make_chk_file_uri(size):
     return uri.CHKFileURI(key=os.urandom(16),
@@ -927,3 +943,25 @@ class WebErrorMixin:
         f.trap(WebError)
         print "Web Error:", f.value, ":", f.value.response
         return f
+
+class MemoryConsumer:
+    implements(IConsumer)
+    def __init__(self):
+        self.chunks = []
+        self.done = False
+    def registerProducer(self, p, streaming):
+        if streaming:
+            # call resumeProducing once to start things off
+            p.resumeProducing()
+        else:
+            while not self.done:
+                p.resumeProducing()
+    def write(self, data):
+        self.chunks.append(data)
+    def unregisterProducer(self):
+        self.done = True
+
+def download_to_data(n, offset=0, size=None):
+    d = n.read(MemoryConsumer(), offset, size)
+    d.addCallback(lambda mc: "".join(mc.chunks))
+    return d
index 7cca719e39b743c8d92257a43b3e4edcafbc4e05..87a21e8987146293ae323f606b06b6fa0b28b12e 100644 (file)
@@ -5,6 +5,7 @@ from allmydata.monitor import Monitor
 from allmydata.immutable import filenode, download
 from allmydata.mutable.node import MutableFileNode
 from allmydata.util import hashutil
+from allmydata.test.common import download_to_data
 
 class NotANode:
     pass
@@ -17,8 +18,8 @@ class Node(unittest.TestCase):
                            total_shares=10,
                            size=1000)
         c = None
-        fn1 = filenode.FileNode(u, c)
-        fn2 = filenode.FileNode(u.to_string(), c)
+        fn1 = filenode.FileNode(u, c, "cachefile")
+        fn2 = filenode.FileNode(u.to_string(), c, "cachefile")
         self.failUnlessEqual(fn1, fn2)
         self.failIfEqual(fn1, "I am not a filenode")
         self.failIfEqual(fn1, NotANode())
@@ -63,6 +64,14 @@ class Node(unittest.TestCase):
         d.addCallback(lambda res: fn1.download_to_data())
         d.addCallback(_check)
 
+        d.addCallback(lambda res: download_to_data(fn1))
+        d.addCallback(_check)
+
+        d.addCallback(lambda res: download_to_data(fn1, 1, 5))
+        def _check_segment(res):
+            self.failUnlessEqual(res, DATA[1:1+5])
+        d.addCallback(_check_segment)
+
         return d
 
     def test_mutable_filenode(self):
index e3d0ddfb55997476a50e3847405b4556e63afceb..70c03183c68692018e210ec684aee8c5d1287b33 100644 (file)
@@ -24,7 +24,8 @@ from twisted.python.failure import Failure
 from twisted.web.client import getPage
 from twisted.web.error import Error
 
-from allmydata.test.common import SystemTestMixin, WebErrorMixin
+from allmydata.test.common import SystemTestMixin, WebErrorMixin, \
+     MemoryConsumer, download_to_data
 
 LARGE_DATA = """
 This is some data to publish to the virtual drive, which needs to be large
@@ -185,6 +186,25 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
             self.failUnlessEqual(consumer.contents, DATA)
         d.addCallback(_download_to_consumer_done)
 
+        def _test_read(res):
+            n = self.clients[1].create_node_from_uri(self.uri)
+            d = download_to_data(n)
+            def _read_done(data):
+                self.failUnlessEqual(data, DATA)
+            d.addCallback(_read_done)
+            d.addCallback(lambda ign:
+                          n.read(MemoryConsumer(), offset=1, size=4))
+            def _read_portion_done(mc):
+                self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
+            d.addCallback(_read_portion_done)
+            d.addCallback(lambda ign:
+                          n.read(MemoryConsumer(), offset=2, size=None))
+            def _read_tail_done(mc):
+                self.failUnlessEqual("".join(mc.chunks), DATA[2:])
+            d.addCallback(_read_tail_done)
+            return d
+        d.addCallback(_test_read)
+
         def _download_nonexistent_uri(res):
             baduri = self.mangle_uri(self.uri)
             log.msg("about to download non-existent URI", level=log.UNUSUAL,
index ed5bc8a3cbfe7c67053f72d54d5c0c4c0ee82c16..b41380e7f74e1e4a990aab947cc042071e269a10 100644 (file)
@@ -97,10 +97,11 @@ class FakeClient(service.MultiService):
     def list_all_helper_statuses(self):
         return []
 
+class MyGetter(client.HTTPPageGetter):
+    handleStatus_206 = lambda self: self.handleStatus_200()
+
 class HTTPClientHEADFactory(client.HTTPClientFactory):
-    def __init__(self, *args, **kwargs):
-        client.HTTPClientFactory.__init__(self, *args, **kwargs)
-        self.deferred.addCallback(lambda res: self.response_headers)
+    protocol = MyGetter
 
     def noPage(self, reason):
         # Twisted-2.5.0 and earlier had a bug, in which they would raise an
@@ -114,6 +115,8 @@ class HTTPClientHEADFactory(client.HTTPClientFactory):
             return
         return client.HTTPClientFactory.noPage(self, reason)
 
+class HTTPClientGETFactory(client.HTTPClientFactory):
+    protocol = MyGetter
 
 class WebMixin(object):
     def setUp(self):
@@ -236,21 +239,37 @@ class WebMixin(object):
         self.failUnlessEqual(kids[u"n\u00fc.txt"][1]["ro_uri"],
                              self._bar_txt_uri)
 
-    def GET(self, urlpath, followRedirect=False):
+    def GET(self, urlpath, followRedirect=False, return_response=False,
+            **kwargs):
+        # if return_response=True, this fires with (data, statuscode,
+        # respheaders) instead of just data.
         assert not isinstance(urlpath, unicode)
         url = self.webish_url + urlpath
-        return client.getPage(url, method="GET", followRedirect=followRedirect)
+        factory = HTTPClientGETFactory(url, method="GET",
+                                       followRedirect=followRedirect, **kwargs)
+        reactor.connectTCP("localhost", self.webish_port, factory)
+        d = factory.deferred
+        def _got_data(data):
+            return (data, factory.status, factory.response_headers)
+        if return_response:
+            d.addCallback(_got_data)
+        return factory.deferred
 
-    def HEAD(self, urlpath):
+    def HEAD(self, urlpath, return_response=False, **kwargs):
         # this requires some surgery, because twisted.web.client doesn't want
         # to give us back the response headers.
-        factory = HTTPClientHEADFactory(urlpath, method="HEAD")
+        factory = HTTPClientHEADFactory(urlpath, method="HEAD", **kwargs)
         reactor.connectTCP("localhost", self.webish_port, factory)
+        d = factory.deferred
+        def _got_data(data):
+            return (data, factory.status, factory.response_headers)
+        if return_response:
+            d.addCallback(_got_data)
         return factory.deferred
 
-    def PUT(self, urlpath, data):
+    def PUT(self, urlpath, data, **kwargs):
         url = self.webish_url + urlpath
-        return client.getPage(url, method="PUT", postdata=data)
+        return client.getPage(url, method="PUT", postdata=data, **kwargs)
 
     def DELETE(self, urlpath):
         url = self.webish_url + urlpath
@@ -515,9 +534,45 @@ class Web(WebMixin, testutil.StallMixin, unittest.TestCase):
         d.addCallback(self.failUnlessIsBarDotTxt)
         return d
 
+    def test_GET_FILEURL_range(self):
+        headers = {"range": "bytes=1-10"}
+        d = self.GET(self.public_url + "/foo/bar.txt", headers=headers,
+                     return_response=True)
+        def _got((res, status, headers)):
+            self.failUnlessEqual(int(status), 206)
+            self.failUnless(headers.has_key("content-range"))
+            self.failUnlessEqual(headers["content-range"][0],
+                                 "bytes 1-10/%d" % len(self.BAR_CONTENTS))
+            self.failUnlessEqual(res, self.BAR_CONTENTS[1:11])
+        d.addCallback(_got)
+        return d
+
+    def test_HEAD_FILEURL_range(self):
+        headers = {"range": "bytes=1-10"}
+        d = self.HEAD(self.public_url + "/foo/bar.txt", headers=headers,
+                     return_response=True)
+        def _got((res, status, headers)):
+            self.failUnlessEqual(res, "")
+            self.failUnlessEqual(int(status), 206)
+            self.failUnless(headers.has_key("content-range"))
+            self.failUnlessEqual(headers["content-range"][0],
+                                 "bytes 1-10/%d" % len(self.BAR_CONTENTS))
+        d.addCallback(_got)
+        return d
+
+    def test_GET_FILEURL_range_bad(self):
+        headers = {"range": "BOGUS=fizbop-quarnak"}
+        d = self.shouldFail2(error.Error, "test_GET_FILEURL_range_bad",
+                             "400 Bad Request",
+                             "Syntactically invalid http range header",
+                             self.GET, self.public_url + "/foo/bar.txt",
+                             headers=headers)
+        return d
+
     def test_HEAD_FILEURL(self):
-        d = self.HEAD(self.public_url + "/foo/bar.txt")
-        def _got(headers):
+        d = self.HEAD(self.public_url + "/foo/bar.txt", return_response=True)
+        def _got((res, status, headers)):
+            self.failUnlessEqual(res, "")
             self.failUnlessEqual(headers["content-length"][0],
                                  str(len(self.BAR_CONTENTS)))
             self.failUnlessEqual(headers["content-type"], ["text/plain"])
@@ -634,6 +689,19 @@ class Web(WebMixin, testutil.StallMixin, unittest.TestCase):
                                                       self.NEWFILE_CONTENTS))
         return d
 
+    def test_PUT_NEWFILEURL_range_bad(self):
+        headers = {"content-range": "bytes 1-10/%d" % len(self.NEWFILE_CONTENTS)}
+        target = self.public_url + "/foo/new.txt"
+        d = self.shouldFail2(error.Error, "test_PUT_NEWFILEURL_range_bad",
+                             "501 Not Implemented",
+                             "Content-Range in PUT not yet supported",
+                             # (and certainly not for immutable files)
+                             self.PUT, target, self.NEWFILE_CONTENTS[1:11],
+                             headers=headers)
+        d.addCallback(lambda res:
+                      self.failIfNodeHasChild(self._foo_node, u"new.txt"))
+        return d
+
     def test_PUT_NEWFILEURL_mutable(self):
         d = self.PUT(self.public_url + "/foo/new.txt?mutable=true",
                      self.NEWFILE_CONTENTS)
@@ -1344,8 +1412,10 @@ class Web(WebMixin, testutil.StallMixin, unittest.TestCase):
 
         # and that HEAD computes the size correctly
         d.addCallback(lambda res:
-                      self.HEAD(self.public_url + "/foo/new.txt"))
-        def _got_headers(headers):
+                      self.HEAD(self.public_url + "/foo/new.txt",
+                                return_response=True))
+        def _got_headers((res, status, headers)):
+            self.failUnlessEqual(res, "")
             self.failUnlessEqual(headers["content-length"][0],
                                  str(len(NEW2_CONTENTS)))
             self.failUnlessEqual(headers["content-type"], ["text/plain"])
index 1050e9cca2f3b3557e7817a7df71438a9fa0b7c8..a831b80d6036640d8a10a87eff5ae823d2fcbd3a 100644 (file)
@@ -4,7 +4,8 @@ from zope.interface import Interface
 from nevow import loaders, appserver
 from nevow.inevow import IRequest
 from nevow.util import resource_filename
-from allmydata.interfaces import ExistingChildError, FileTooLargeError
+from allmydata.interfaces import ExistingChildError, NoSuchChildError, \
+     FileTooLargeError, NotEnoughSharesError
 
 class IClient(Interface):
     pass
@@ -122,6 +123,13 @@ class MyExceptionHandler(appserver.DefaultExceptionHandler):
                                "name, and you asked me to not "
                                "replace it.",
                                http.CONFLICT)
+        elif f.check(NoSuchChildError):
+            name = f.value.args[0]
+            return self.simple(ctx,
+                               "No such child: %s" % name.encode("utf-8"),
+                               http.NOT_FOUND)
+        elif f.check(NotEnoughSharesError):
+            return self.simple(ctx, str(f), http.GONE)
         elif f.check(WebError):
             return self.simple(ctx, f.value.text, f.value.code)
         elif f.check(FileTooLargeError):
index a52dae4379bf5558bb176a9fefa7d7c132f8cec2..b96ba175a5bd7073033a00d31539b10f65854f6e 100644 (file)
@@ -1,14 +1,12 @@
 
 import simplejson
 
-from zope.interface import implements
-from twisted.internet.interfaces import IConsumer
-from twisted.web import http, static, resource, server
+from twisted.web import http, static
 from twisted.internet import defer
 from nevow import url, rend
 from nevow.inevow import IRequest
 
-from allmydata.interfaces import IDownloadTarget, ExistingChildError
+from allmydata.interfaces import ExistingChildError
 from allmydata.monitor import Monitor
 from allmydata.immutable.upload import FileHandle
 from allmydata.immutable.filenode import LiteralFileNode
@@ -109,6 +107,9 @@ class PlaceHolderNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
         t = get_arg(req, "t", "").strip()
         replace = boolean_of_arg(get_arg(req, "replace", "true"))
         assert self.parentnode and self.name
+        if req.getHeader("content-range"):
+            raise WebError("Content-Range in PUT not yet supported",
+                           http.NOT_IMPLEMENTED)
         if not t:
             return self.replace_me_with_a_child(ctx, replace)
         if t == "uri":
@@ -160,7 +161,6 @@ class FileNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
         t = get_arg(req, "t", "").strip()
         if not t:
             # just get the contents
-            save_to_file = boolean_of_arg(get_arg(req, "save", "False"))
             # the filename arrives as part of the URL or in a form input
             # element, and will be sent back in a Content-Disposition header.
             # Different browsers use various character sets for this name,
@@ -173,7 +173,13 @@ class FileNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
             # properly. So we assume that at least the browser will agree
             # with itself, and echo back the same bytes that we were given.
             filename = get_arg(req, "filename", self.name) or "unknown"
-            return FileDownloader(self.node, filename, save_to_file)
+            if self.node.is_mutable():
+                # some day: d = self.node.get_best_version()
+                d = makeMutableDownloadable(self.node)
+            else:
+                d = defer.succeed(self.node)
+            d.addCallback(lambda dn: FileDownloader(dn, filename))
+            return d
         if t == "json":
             return FileJSONMetadata(ctx, self.node)
         if t == "info":
@@ -189,25 +195,13 @@ class FileNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
         t = get_arg(req, "t", "").strip()
         if t:
             raise WebError("GET file: bad t=%s" % t)
-        # if we have a filename, use it to get the content-type
         filename = get_arg(req, "filename", self.name) or "unknown"
-        gte = static.getTypeAndEncoding
-        ctype, encoding = gte(filename,
-                              static.File.contentTypes,
-                              static.File.contentEncodings,
-                              defaultType="text/plain")
-        req.setHeader("content-type", ctype)
-        if encoding:
-            req.setHeader("content-encoding", encoding)
         if self.node.is_mutable():
-            d = self.node.get_size_of_best_version()
-        # otherwise, we can get the size from the URI
+            # some day: d = self.node.get_best_version()
+            d = makeMutableDownloadable(self.node)
         else:
-            d = defer.succeed(self.node.get_size())
-        def _got_length(length):
-            req.setHeader("content-length", length)
-            return ""
-        d.addCallback(_got_length)
+            d = defer.succeed(self.node)
+        d.addCallback(lambda dn: FileDownloader(dn, filename))
         return d
 
     def render_PUT(self, ctx):
@@ -295,71 +289,33 @@ class FileNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
         d.addCallback(lambda res: self.node.get_uri())
         return d
 
-
-class WebDownloadTarget:
-    implements(IDownloadTarget, IConsumer)
-    def __init__(self, req, content_type, content_encoding, save_to_filename):
-        self._req = req
-        self._content_type = content_type
-        self._content_encoding = content_encoding
-        self._opened = False
-        self._producer = None
-        self._save_to_filename = save_to_filename
-
-    def registerProducer(self, producer, streaming):
-        self._req.registerProducer(producer, streaming)
-    def unregisterProducer(self):
-        self._req.unregisterProducer()
-
-    def open(self, size):
-        self._opened = True
-        self._req.setHeader("content-type", self._content_type)
-        if self._content_encoding:
-            self._req.setHeader("content-encoding", self._content_encoding)
-        self._req.setHeader("content-length", str(size))
-        if self._save_to_filename is not None:
-            # tell the browser to save the file rather display it we don't
-            # try to encode the filename, instead we echo back the exact same
-            # bytes we were given in the URL. See the comment in
-            # FileNodeHandler.render_GET for the sad details.
-            filename = self._save_to_filename
-            self._req.setHeader("content-disposition",
-                                'attachment; filename="%s"' % filename)
-
-    def write(self, data):
-        self._req.write(data)
-    def close(self):
-        self._req.finish()
-
-    def fail(self, why):
-        if self._opened:
-            # The content-type is already set, and the response code
-            # has already been sent, so we can't provide a clean error
-            # indication. We can emit text (which a browser might interpret
-            # as something else), and if we sent a Size header, they might
-            # notice that we've truncated the data. Keep the error message
-            # small to improve the chances of having our error response be
-            # shorter than the intended results.
-            #
-            # We don't have a lot of options, unfortunately.
-            self._req.write("problem during download\n")
+class MutableDownloadable:
+    #implements(IDownloadable)
+    def __init__(self, size, node):
+        self.size = size
+        self.node = node
+    def get_size(self):
+        return self.size
+    def read(self, consumer, offset=0, size=None):
+        d = self.node.download_best_version()
+        d.addCallback(self._got_data, consumer, offset, size)
+        return d
+    def _got_data(self, contents, consumer, offset, size):
+        start = offset
+        if size is not None:
+            end = offset+size
         else:
-            # We haven't written anything yet, so we can provide a sensible
-            # error message.
-            msg = str(why.type)
-            msg.replace("\n", "|")
-            self._req.setResponseCode(http.GONE, msg)
-            self._req.setHeader("content-type", "text/plain")
-            # TODO: HTML-formatted exception?
-            self._req.write(str(why))
-        self._req.finish()
-
-    def register_canceller(self, cb):
-        pass
-    def finish(self):
-        pass
-
-class FileDownloader(resource.Resource):
+            end = self.size
+        # SDMF: we can write the whole file in one big chunk
+        consumer.write(contents[start:end])
+        return consumer
+
+def makeMutableDownloadable(n):
+    d = defer.maybeDeferred(n.get_size_of_best_version)
+    d.addCallback(MutableDownloadable, n)
+    return d
+
+class FileDownloader(rend.Page):
     # since we override the rendering process (to let the tahoe Downloader
     # drive things), we must inherit from regular old twisted.web.resource
     # instead of nevow.rend.Page . Nevow will use adapters to wrap a
@@ -368,25 +324,81 @@ class FileDownloader(resource.Resource):
     # that wrapper would allow us to return a Deferred from render(), which
     # might could simplify the implementation of WebDownloadTarget.
 
-    def __init__(self, filenode, filename, save_to_file):
-        resource.Resource.__init__(self)
+    def __init__(self, filenode, filename):
+        rend.Page.__init__(self)
         self.filenode = filenode
         self.filename = filename
-        self.save_to_file = save_to_file
-    def render(self, req):
+
+    def renderHTTP(self, ctx):
+        req = IRequest(ctx)
         gte = static.getTypeAndEncoding
         ctype, encoding = gte(self.filename,
                               static.File.contentTypes,
                               static.File.contentEncodings,
                               defaultType="text/plain")
+        req.setHeader("content-type", ctype)
+        if encoding:
+            req.setHeader("content-encoding", encoding)
+
         save_to_filename = None
-        if self.save_to_file:
-            save_to_filename = self.filename
-        wdt = WebDownloadTarget(req, ctype, encoding, save_to_filename)
-        d = self.filenode.download(wdt)
-        # exceptions during download are handled by the WebDownloadTarget
-        d.addErrback(lambda why: None)
-        return server.NOT_DONE_YET
+        if boolean_of_arg(get_arg(req, "save", "False")):
+            # tell the browser to save the file rather display it we don't
+            # try to encode the filename, instead we echo back the exact same
+            # bytes we were given in the URL. See the comment in
+            # FileNodeHandler.render_GET for the sad details.
+            req.setHeader("content-disposition",
+                          'attachment; filename="%s"' % self.filename)
+
+        filesize = self.filenode.get_size()
+        assert isinstance(filesize, (int,long)), filesize
+        offset, size = 0, None
+        contentsize = filesize
+        rangeheader = req.getHeader('range')
+        if rangeheader:
+            # adapted from nevow.static.File
+            bytesrange = rangeheader.split('=')
+            if bytesrange[0] != 'bytes':
+                raise WebError("Syntactically invalid http range header!")
+            start, end = bytesrange[1].split('-')
+            if start:
+                offset = int(start)
+            if end:
+                size = int(end) - offset + 1
+            req.setResponseCode(http.PARTIAL_CONTENT)
+            req.setHeader('content-range',"bytes %s-%s/%s" %
+                          (str(offset), str(offset+size-1), str(filesize)))
+            contentsize = size
+        req.setHeader("content-length", str(contentsize))
+        if req.method == "HEAD":
+            return ""
+        d = self.filenode.read(req, offset, size)
+        def _error(f):
+            if req.startedWriting:
+                # The content-type is already set, and the response code has
+                # already been sent, so we can't provide a clean error
+                # indication. We can emit text (which a browser might
+                # interpret as something else), and if we sent a Size header,
+                # they might notice that we've truncated the data. Keep the
+                # error message small to improve the chances of having our
+                # error response be shorter than the intended results.
+                #
+                # We don't have a lot of options, unfortunately.
+                req.write("problem during download\n")
+            else:
+                # We haven't written anything yet, so we can provide a
+                # sensible error message.
+                msg = str(f.type)
+                msg.replace("\n", "|")
+                req.setResponseCode(http.GONE, msg)
+                req.setHeader("content-type", "text/plain")
+                req.responseHeaders.setRawHeaders("content-encoding", [])
+                req.responseHeaders.setRawHeaders("content-disposition", [])
+                # TODO: HTML-formatted exception?
+                req.write(str(f))
+        d.addErrback(_error)
+        d.addBoth(lambda ign: req.finish())
+        return req.deferred
+
 
 def FileJSONMetadata(ctx, filenode):
     if filenode.is_readonly():