From: Brian Warner Date: Tue, 28 Oct 2008 20:41:04 +0000 (-0700) Subject: #527: support HTTP 'Range:' requests, using a cachefile. Adds filenode.read(consumer... X-Git-Url: https://git.rkrishnan.org/(%5B%5E?a=commitdiff_plain;h=37e3d8e47c489ad808168c85173cc0bb204798d0;p=tahoe-lafs%2Ftahoe-lafs.git #527: support HTTP 'Range:' requests, using a cachefile. Adds filenode.read(consumer, offset, size) method. Still needs: cache expiration, reduced alacrity. --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 0f323b82..797d1745 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -18,7 +18,7 @@ from allmydata.immutable.filenode import FileNode, LiteralFileNode from allmydata.offloaded import Helper from allmydata.control import ControlServer from allmydata.introducer.client import IntroducerClient -from allmydata.util import hashutil, base32, testutil +from allmydata.util import hashutil, base32, testutil, fileutil from allmydata.uri import LiteralFileURI from allmydata.dirnode import NewDirectoryNode from allmydata.mutable.node import MutableFileNode, MutableWatcher @@ -188,6 +188,9 @@ class Client(node.Node, testutil.PollMixin): self.convergence = base32.a2b(convergence_s) self._node_cache = weakref.WeakValueDictionary() # uri -> node self.add_service(Uploader(helper_furl, self.stats_provider)) + self.download_cachedir = os.path.join(self.basedir, + "private", "cache", "download") + fileutil.make_dirs(self.download_cachedir) self.add_service(Downloader(self.stats_provider)) self.add_service(MutableWatcher(self.stats_provider)) def _publish(res): @@ -334,7 +337,10 @@ class Client(node.Node, testutil.PollMixin): if isinstance(u, LiteralFileURI): node = LiteralFileNode(u, self) # LIT else: - node = FileNode(u, self) # CHK + cachefile = os.path.join(self.download_cachedir, + base32.b2a(u.storage_index)) + # TODO: cachefile manager, weakref, expire policy + node = FileNode(u, self, cachefile) # CHK else: assert IMutableFileURI.providedBy(u), u node = MutableFileNode(self).init_from_uri(u) diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index 80cd2460..0a62833c 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -1067,7 +1067,7 @@ class ConsumerAdapter: def register_canceller(self, cb): pass def finish(self): - return None + return self._consumer class Downloader(service.MultiService): diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index bf06cb51..a9a4aa4e 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -1,15 +1,18 @@ +import os.path, stat +from cStringIO import StringIO from zope.interface import implements from twisted.internet import defer from twisted.internet.interfaces import IPushProducer, IConsumer +from twisted.protocols import basic from allmydata.interfaces import IFileNode, IFileURI, ICheckable +from allmydata.util import observer, log, base32 from allmydata.immutable.checker import SimpleCHKFileChecker, \ SimpleCHKFileVerifier +from allmydata.immutable import download -class ImmutableFileNode(object): +class _ImmutableFileNodeBase(object): implements(IFileNode, ICheckable) - checker_class = SimpleCHKFileChecker - verifier_class = SimpleCHKFileVerifier def __init__(self, uri, client): self.u = IFileURI(uri) @@ -37,11 +40,41 @@ class ImmutableFileNode(object): else: return True -class FileNode(ImmutableFileNode): +class PortionOfFile: + # like a list slice (things[2:14]), but for a file on disk + def __init__(self, fn, offset=0, size=None): + self.f = open(fn, "rb") + self.f.seek(offset) + self.bytes_left = size + + def read(self, size=None): + # bytes_to_read = min(size, self.bytes_left), but None>anything + if size is None: + bytes_to_read = self.bytes_left + elif self.bytes_left is None: + bytes_to_read = size + else: + bytes_to_read = min(size, self.bytes_left) + data = self.f.read(bytes_to_read) + if self.bytes_left is not None: + self.bytes_left -= len(data) + return data + +class FileNode(_ImmutableFileNodeBase): checker_class = SimpleCHKFileChecker + verifier_class = SimpleCHKFileVerifier - def __init__(self, uri, client): - ImmutableFileNode.__init__(self, uri, client) + def __init__(self, uri, client, cachefile): + _ImmutableFileNodeBase.__init__(self, uri, client) + self.cachefile = cachefile + # five states: + # new FileNode, no downloads ever performed + # new FileNode, leftover file (partial) + # new FileNode, leftover file (whole) + # download in progress, not yet complete + # download complete + self.download_in_progress = False + self.fully_cached_observer = observer.OneShotObserverList() def get_uri(self): return self.u.to_string() @@ -84,6 +117,67 @@ class FileNode(ImmutableFileNode): d.addCallback(_done) return d + def read(self, consumer, offset=0, size=None): + if size is None: + size = self.get_size() - offset + + assert self.cachefile + + try: + filesize = os.stat(self.cachefile)[stat.ST_SIZE] + except OSError: + filesize = 0 + if filesize >= offset+size: + log.msg(format=("immutable filenode read [%(si)s]: " + + "satisfied from cache " + + "(read %(start)d+%(size)d, filesize %(filesize)d)"), + si=base32.b2a(self.u.storage_index), + start=offset, size=size, filesize=filesize, + umid="5p5ECA", level=log.OPERATIONAL) + f = PortionOfFile(self.cachefile, offset, size) + d = basic.FileSender().beginFileTransfer(f, consumer) + d.addCallback(lambda lastSent: consumer) + return d + + if offset == 0 and size == self.get_size(): + # don't use the cache, just do a normal streaming download + log.msg(format=("immutable filenode read [%(si)s]: " + + "doing normal full download"), + si=base32.b2a(self.u.storage_index), + umid="VRSBwg", level=log.OPERATIONAL) + return self.download(download.ConsumerAdapter(consumer)) + + if not self.download_in_progress: + log.msg(format=("immutable filenode read [%(si)s]: " + + "starting download"), + si=base32.b2a(self.u.storage_index), + umid="h26Heg", level=log.OPERATIONAL) + self.start_download_to_cache() + + # The file is being downloaded, but the portion we want isn't yet + # available, so we have to wait. First cut: wait for the whole thing + # to download. The second cut will be to wait for a specific range + # milestone, with a download target that counts bytes and compares + # them against a milestone list. + log.msg(format=("immutable filenode read [%(si)s]: " + + "waiting for download"), + si=base32.b2a(self.u.storage_index), + umid="l48V7Q", level=log.OPERATIONAL) + d = self.when_fully_cached() + d.addCallback(lambda ignored: self.read(consumer, offset, size)) + return d + + def start_download_to_cache(self): + assert not self.download_in_progress + self.download_in_progress = True + downloader = self._client.getServiceNamed("downloader") + d = downloader.download_to_filename(self.get_uri(), self.cachefile) + d.addBoth(self.fully_cached_observer.fire) + + def when_fully_cached(self): + return self.fully_cached_observer.when_fired() + + def download(self, target): downloader = self._client.getServiceNamed("downloader") return downloader.download(self.get_uri(), target) @@ -99,10 +193,11 @@ class LiteralProducer: def stopProducing(self): pass -class LiteralFileNode(ImmutableFileNode): + +class LiteralFileNode(_ImmutableFileNodeBase): def __init__(self, uri, client): - ImmutableFileNode.__init__(self, uri, client) + _ImmutableFileNodeBase.__init__(self, uri, client) def get_uri(self): return self.u.to_string() @@ -122,6 +217,25 @@ class LiteralFileNode(ImmutableFileNode): def check_and_repair(self, monitor, verify=False): return defer.succeed(None) + def read(self, consumer, offset=0, size=None): + if size is None: + data = self.u.data[offset:] + else: + data = self.u.data[offset:offset+size] + + # We use twisted.protocols.basic.FileSender, which only does + # non-streaming, i.e. PullProducer, where the receiver/consumer must + # ask explicitly for each chunk of data. There are only two places in + # the Twisted codebase that can't handle streaming=False, both of + # which are in the upload path for an FTP/SFTP server + # (protocols.ftp.FileConsumer and + # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is + # likely to be used as the target for a Tahoe download. + + d = basic.FileSender().beginFileTransfer(StringIO(data), consumer) + d.addCallback(lambda lastSent: consumer) + return d + def download(self, target): # note that this does not update the stats_provider data = self.u.data diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 7acc163d..c624bdfa 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -480,6 +480,58 @@ class IFileNode(IFilesystemNode): def get_size(): """Return the length (in bytes) of the data this node represents.""" + def read(consumer, offset=0, size=None): + """Download a portion (possibly all) of the file's contents, making + them available to the given IConsumer. Return a Deferred that fires + (with the consumer) when the consumer is unregistered (either because + the last byte has been given to it, or because the consumer threw an + exception during write(), possibly because it no longer wants to + receive data). The portion downloaded will start at 'offset' and + contain 'size' bytes (or the remainder of the file if size==None). + + The consumer will be used in non-streaming mode: an IPullProducer + will be attached to it. + + The consumer will not receive data right away: several network trips + must occur first. The order of events will be:: + + consumer.registerProducer(p, streaming) + (if streaming == False):: + consumer does p.resumeProducing() + consumer.write(data) + consumer does p.resumeProducing() + consumer.write(data).. (repeat until all data is written) + consumer.unregisterProducer() + deferred.callback(consumer) + + If a download error occurs, or an exception is raised by + consumer.registerProducer() or consumer.write(), I will call + consumer.unregisterProducer() and then deliver the exception via + deferred.errback(). To cancel the download, the consumer should call + p.stopProducing(), which will result in an exception being delivered + via deferred.errback(). + + A simple download-to-memory consumer example would look like this:: + + class MemoryConsumer: + implements(IConsumer) + def __init__(self): + self.chunks = [] + self.done = False + def registerProducer(self, p, streaming): + assert streaming == False + while not self.done: + p.resumeProducing() + def write(self, data): + self.chunks.append(data) + def unregisterProducer(self): + self.done = True + d = filenode.read(MemoryConsumer()) + d.addCallback(lambda mc: "".join(mc.chunks)) + return d + + """ + class IMutableFileNode(IFileNode, IMutableFilesystemNode): """I provide access to a 'mutable file', which retains its identity regardless of what contents are put in it. diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index 4e08af29..1898acc6 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -2,6 +2,7 @@ import os from zope.interface import implements from twisted.internet import defer +from twisted.internet.interfaces import IConsumer from twisted.python import failure from twisted.application import service from twisted.web.error import Error as WebError @@ -101,8 +102,23 @@ class FakeCHKFileNode: data = self.all_contents[self.my_uri] return defer.succeed(data) def get_size(self): - data = self.all_contents[self.my_uri] + try: + data = self.all_contents[self.my_uri] + except KeyError: + raise NotEnoughSharesError() return len(data) + def read(self, consumer, offset=0, size=None): + d = self.download_to_data() + def _got(data): + start = offset + if size is not None: + end = offset + size + else: + end = len(data) + consumer.write(data[start:end]) + return consumer + d.addCallback(_got) + return d def make_chk_file_uri(size): return uri.CHKFileURI(key=os.urandom(16), @@ -927,3 +943,25 @@ class WebErrorMixin: f.trap(WebError) print "Web Error:", f.value, ":", f.value.response return f + +class MemoryConsumer: + implements(IConsumer) + def __init__(self): + self.chunks = [] + self.done = False + def registerProducer(self, p, streaming): + if streaming: + # call resumeProducing once to start things off + p.resumeProducing() + else: + while not self.done: + p.resumeProducing() + def write(self, data): + self.chunks.append(data) + def unregisterProducer(self): + self.done = True + +def download_to_data(n, offset=0, size=None): + d = n.read(MemoryConsumer(), offset, size) + d.addCallback(lambda mc: "".join(mc.chunks)) + return d diff --git a/src/allmydata/test/test_filenode.py b/src/allmydata/test/test_filenode.py index 7cca719e..87a21e89 100644 --- a/src/allmydata/test/test_filenode.py +++ b/src/allmydata/test/test_filenode.py @@ -5,6 +5,7 @@ from allmydata.monitor import Monitor from allmydata.immutable import filenode, download from allmydata.mutable.node import MutableFileNode from allmydata.util import hashutil +from allmydata.test.common import download_to_data class NotANode: pass @@ -17,8 +18,8 @@ class Node(unittest.TestCase): total_shares=10, size=1000) c = None - fn1 = filenode.FileNode(u, c) - fn2 = filenode.FileNode(u.to_string(), c) + fn1 = filenode.FileNode(u, c, "cachefile") + fn2 = filenode.FileNode(u.to_string(), c, "cachefile") self.failUnlessEqual(fn1, fn2) self.failIfEqual(fn1, "I am not a filenode") self.failIfEqual(fn1, NotANode()) @@ -63,6 +64,14 @@ class Node(unittest.TestCase): d.addCallback(lambda res: fn1.download_to_data()) d.addCallback(_check) + d.addCallback(lambda res: download_to_data(fn1)) + d.addCallback(_check) + + d.addCallback(lambda res: download_to_data(fn1, 1, 5)) + def _check_segment(res): + self.failUnlessEqual(res, DATA[1:1+5]) + d.addCallback(_check_segment) + return d def test_mutable_filenode(self): diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index e3d0ddfb..70c03183 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -24,7 +24,8 @@ from twisted.python.failure import Failure from twisted.web.client import getPage from twisted.web.error import Error -from allmydata.test.common import SystemTestMixin, WebErrorMixin +from allmydata.test.common import SystemTestMixin, WebErrorMixin, \ + MemoryConsumer, download_to_data LARGE_DATA = """ This is some data to publish to the virtual drive, which needs to be large @@ -185,6 +186,25 @@ class SystemTest(SystemTestMixin, unittest.TestCase): self.failUnlessEqual(consumer.contents, DATA) d.addCallback(_download_to_consumer_done) + def _test_read(res): + n = self.clients[1].create_node_from_uri(self.uri) + d = download_to_data(n) + def _read_done(data): + self.failUnlessEqual(data, DATA) + d.addCallback(_read_done) + d.addCallback(lambda ign: + n.read(MemoryConsumer(), offset=1, size=4)) + def _read_portion_done(mc): + self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4]) + d.addCallback(_read_portion_done) + d.addCallback(lambda ign: + n.read(MemoryConsumer(), offset=2, size=None)) + def _read_tail_done(mc): + self.failUnlessEqual("".join(mc.chunks), DATA[2:]) + d.addCallback(_read_tail_done) + return d + d.addCallback(_test_read) + def _download_nonexistent_uri(res): baduri = self.mangle_uri(self.uri) log.msg("about to download non-existent URI", level=log.UNUSUAL, diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index ed5bc8a3..b41380e7 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -97,10 +97,11 @@ class FakeClient(service.MultiService): def list_all_helper_statuses(self): return [] +class MyGetter(client.HTTPPageGetter): + handleStatus_206 = lambda self: self.handleStatus_200() + class HTTPClientHEADFactory(client.HTTPClientFactory): - def __init__(self, *args, **kwargs): - client.HTTPClientFactory.__init__(self, *args, **kwargs) - self.deferred.addCallback(lambda res: self.response_headers) + protocol = MyGetter def noPage(self, reason): # Twisted-2.5.0 and earlier had a bug, in which they would raise an @@ -114,6 +115,8 @@ class HTTPClientHEADFactory(client.HTTPClientFactory): return return client.HTTPClientFactory.noPage(self, reason) +class HTTPClientGETFactory(client.HTTPClientFactory): + protocol = MyGetter class WebMixin(object): def setUp(self): @@ -236,21 +239,37 @@ class WebMixin(object): self.failUnlessEqual(kids[u"n\u00fc.txt"][1]["ro_uri"], self._bar_txt_uri) - def GET(self, urlpath, followRedirect=False): + def GET(self, urlpath, followRedirect=False, return_response=False, + **kwargs): + # if return_response=True, this fires with (data, statuscode, + # respheaders) instead of just data. assert not isinstance(urlpath, unicode) url = self.webish_url + urlpath - return client.getPage(url, method="GET", followRedirect=followRedirect) + factory = HTTPClientGETFactory(url, method="GET", + followRedirect=followRedirect, **kwargs) + reactor.connectTCP("localhost", self.webish_port, factory) + d = factory.deferred + def _got_data(data): + return (data, factory.status, factory.response_headers) + if return_response: + d.addCallback(_got_data) + return factory.deferred - def HEAD(self, urlpath): + def HEAD(self, urlpath, return_response=False, **kwargs): # this requires some surgery, because twisted.web.client doesn't want # to give us back the response headers. - factory = HTTPClientHEADFactory(urlpath, method="HEAD") + factory = HTTPClientHEADFactory(urlpath, method="HEAD", **kwargs) reactor.connectTCP("localhost", self.webish_port, factory) + d = factory.deferred + def _got_data(data): + return (data, factory.status, factory.response_headers) + if return_response: + d.addCallback(_got_data) return factory.deferred - def PUT(self, urlpath, data): + def PUT(self, urlpath, data, **kwargs): url = self.webish_url + urlpath - return client.getPage(url, method="PUT", postdata=data) + return client.getPage(url, method="PUT", postdata=data, **kwargs) def DELETE(self, urlpath): url = self.webish_url + urlpath @@ -515,9 +534,45 @@ class Web(WebMixin, testutil.StallMixin, unittest.TestCase): d.addCallback(self.failUnlessIsBarDotTxt) return d + def test_GET_FILEURL_range(self): + headers = {"range": "bytes=1-10"} + d = self.GET(self.public_url + "/foo/bar.txt", headers=headers, + return_response=True) + def _got((res, status, headers)): + self.failUnlessEqual(int(status), 206) + self.failUnless(headers.has_key("content-range")) + self.failUnlessEqual(headers["content-range"][0], + "bytes 1-10/%d" % len(self.BAR_CONTENTS)) + self.failUnlessEqual(res, self.BAR_CONTENTS[1:11]) + d.addCallback(_got) + return d + + def test_HEAD_FILEURL_range(self): + headers = {"range": "bytes=1-10"} + d = self.HEAD(self.public_url + "/foo/bar.txt", headers=headers, + return_response=True) + def _got((res, status, headers)): + self.failUnlessEqual(res, "") + self.failUnlessEqual(int(status), 206) + self.failUnless(headers.has_key("content-range")) + self.failUnlessEqual(headers["content-range"][0], + "bytes 1-10/%d" % len(self.BAR_CONTENTS)) + d.addCallback(_got) + return d + + def test_GET_FILEURL_range_bad(self): + headers = {"range": "BOGUS=fizbop-quarnak"} + d = self.shouldFail2(error.Error, "test_GET_FILEURL_range_bad", + "400 Bad Request", + "Syntactically invalid http range header", + self.GET, self.public_url + "/foo/bar.txt", + headers=headers) + return d + def test_HEAD_FILEURL(self): - d = self.HEAD(self.public_url + "/foo/bar.txt") - def _got(headers): + d = self.HEAD(self.public_url + "/foo/bar.txt", return_response=True) + def _got((res, status, headers)): + self.failUnlessEqual(res, "") self.failUnlessEqual(headers["content-length"][0], str(len(self.BAR_CONTENTS))) self.failUnlessEqual(headers["content-type"], ["text/plain"]) @@ -634,6 +689,19 @@ class Web(WebMixin, testutil.StallMixin, unittest.TestCase): self.NEWFILE_CONTENTS)) return d + def test_PUT_NEWFILEURL_range_bad(self): + headers = {"content-range": "bytes 1-10/%d" % len(self.NEWFILE_CONTENTS)} + target = self.public_url + "/foo/new.txt" + d = self.shouldFail2(error.Error, "test_PUT_NEWFILEURL_range_bad", + "501 Not Implemented", + "Content-Range in PUT not yet supported", + # (and certainly not for immutable files) + self.PUT, target, self.NEWFILE_CONTENTS[1:11], + headers=headers) + d.addCallback(lambda res: + self.failIfNodeHasChild(self._foo_node, u"new.txt")) + return d + def test_PUT_NEWFILEURL_mutable(self): d = self.PUT(self.public_url + "/foo/new.txt?mutable=true", self.NEWFILE_CONTENTS) @@ -1344,8 +1412,10 @@ class Web(WebMixin, testutil.StallMixin, unittest.TestCase): # and that HEAD computes the size correctly d.addCallback(lambda res: - self.HEAD(self.public_url + "/foo/new.txt")) - def _got_headers(headers): + self.HEAD(self.public_url + "/foo/new.txt", + return_response=True)) + def _got_headers((res, status, headers)): + self.failUnlessEqual(res, "") self.failUnlessEqual(headers["content-length"][0], str(len(NEW2_CONTENTS))) self.failUnlessEqual(headers["content-type"], ["text/plain"]) diff --git a/src/allmydata/web/common.py b/src/allmydata/web/common.py index 1050e9cc..a831b80d 100644 --- a/src/allmydata/web/common.py +++ b/src/allmydata/web/common.py @@ -4,7 +4,8 @@ from zope.interface import Interface from nevow import loaders, appserver from nevow.inevow import IRequest from nevow.util import resource_filename -from allmydata.interfaces import ExistingChildError, FileTooLargeError +from allmydata.interfaces import ExistingChildError, NoSuchChildError, \ + FileTooLargeError, NotEnoughSharesError class IClient(Interface): pass @@ -122,6 +123,13 @@ class MyExceptionHandler(appserver.DefaultExceptionHandler): "name, and you asked me to not " "replace it.", http.CONFLICT) + elif f.check(NoSuchChildError): + name = f.value.args[0] + return self.simple(ctx, + "No such child: %s" % name.encode("utf-8"), + http.NOT_FOUND) + elif f.check(NotEnoughSharesError): + return self.simple(ctx, str(f), http.GONE) elif f.check(WebError): return self.simple(ctx, f.value.text, f.value.code) elif f.check(FileTooLargeError): diff --git a/src/allmydata/web/filenode.py b/src/allmydata/web/filenode.py index a52dae43..b96ba175 100644 --- a/src/allmydata/web/filenode.py +++ b/src/allmydata/web/filenode.py @@ -1,14 +1,12 @@ import simplejson -from zope.interface import implements -from twisted.internet.interfaces import IConsumer -from twisted.web import http, static, resource, server +from twisted.web import http, static from twisted.internet import defer from nevow import url, rend from nevow.inevow import IRequest -from allmydata.interfaces import IDownloadTarget, ExistingChildError +from allmydata.interfaces import ExistingChildError from allmydata.monitor import Monitor from allmydata.immutable.upload import FileHandle from allmydata.immutable.filenode import LiteralFileNode @@ -109,6 +107,9 @@ class PlaceHolderNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin): t = get_arg(req, "t", "").strip() replace = boolean_of_arg(get_arg(req, "replace", "true")) assert self.parentnode and self.name + if req.getHeader("content-range"): + raise WebError("Content-Range in PUT not yet supported", + http.NOT_IMPLEMENTED) if not t: return self.replace_me_with_a_child(ctx, replace) if t == "uri": @@ -160,7 +161,6 @@ class FileNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin): t = get_arg(req, "t", "").strip() if not t: # just get the contents - save_to_file = boolean_of_arg(get_arg(req, "save", "False")) # the filename arrives as part of the URL or in a form input # element, and will be sent back in a Content-Disposition header. # Different browsers use various character sets for this name, @@ -173,7 +173,13 @@ class FileNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin): # properly. So we assume that at least the browser will agree # with itself, and echo back the same bytes that we were given. filename = get_arg(req, "filename", self.name) or "unknown" - return FileDownloader(self.node, filename, save_to_file) + if self.node.is_mutable(): + # some day: d = self.node.get_best_version() + d = makeMutableDownloadable(self.node) + else: + d = defer.succeed(self.node) + d.addCallback(lambda dn: FileDownloader(dn, filename)) + return d if t == "json": return FileJSONMetadata(ctx, self.node) if t == "info": @@ -189,25 +195,13 @@ class FileNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin): t = get_arg(req, "t", "").strip() if t: raise WebError("GET file: bad t=%s" % t) - # if we have a filename, use it to get the content-type filename = get_arg(req, "filename", self.name) or "unknown" - gte = static.getTypeAndEncoding - ctype, encoding = gte(filename, - static.File.contentTypes, - static.File.contentEncodings, - defaultType="text/plain") - req.setHeader("content-type", ctype) - if encoding: - req.setHeader("content-encoding", encoding) if self.node.is_mutable(): - d = self.node.get_size_of_best_version() - # otherwise, we can get the size from the URI + # some day: d = self.node.get_best_version() + d = makeMutableDownloadable(self.node) else: - d = defer.succeed(self.node.get_size()) - def _got_length(length): - req.setHeader("content-length", length) - return "" - d.addCallback(_got_length) + d = defer.succeed(self.node) + d.addCallback(lambda dn: FileDownloader(dn, filename)) return d def render_PUT(self, ctx): @@ -295,71 +289,33 @@ class FileNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin): d.addCallback(lambda res: self.node.get_uri()) return d - -class WebDownloadTarget: - implements(IDownloadTarget, IConsumer) - def __init__(self, req, content_type, content_encoding, save_to_filename): - self._req = req - self._content_type = content_type - self._content_encoding = content_encoding - self._opened = False - self._producer = None - self._save_to_filename = save_to_filename - - def registerProducer(self, producer, streaming): - self._req.registerProducer(producer, streaming) - def unregisterProducer(self): - self._req.unregisterProducer() - - def open(self, size): - self._opened = True - self._req.setHeader("content-type", self._content_type) - if self._content_encoding: - self._req.setHeader("content-encoding", self._content_encoding) - self._req.setHeader("content-length", str(size)) - if self._save_to_filename is not None: - # tell the browser to save the file rather display it we don't - # try to encode the filename, instead we echo back the exact same - # bytes we were given in the URL. See the comment in - # FileNodeHandler.render_GET for the sad details. - filename = self._save_to_filename - self._req.setHeader("content-disposition", - 'attachment; filename="%s"' % filename) - - def write(self, data): - self._req.write(data) - def close(self): - self._req.finish() - - def fail(self, why): - if self._opened: - # The content-type is already set, and the response code - # has already been sent, so we can't provide a clean error - # indication. We can emit text (which a browser might interpret - # as something else), and if we sent a Size header, they might - # notice that we've truncated the data. Keep the error message - # small to improve the chances of having our error response be - # shorter than the intended results. - # - # We don't have a lot of options, unfortunately. - self._req.write("problem during download\n") +class MutableDownloadable: + #implements(IDownloadable) + def __init__(self, size, node): + self.size = size + self.node = node + def get_size(self): + return self.size + def read(self, consumer, offset=0, size=None): + d = self.node.download_best_version() + d.addCallback(self._got_data, consumer, offset, size) + return d + def _got_data(self, contents, consumer, offset, size): + start = offset + if size is not None: + end = offset+size else: - # We haven't written anything yet, so we can provide a sensible - # error message. - msg = str(why.type) - msg.replace("\n", "|") - self._req.setResponseCode(http.GONE, msg) - self._req.setHeader("content-type", "text/plain") - # TODO: HTML-formatted exception? - self._req.write(str(why)) - self._req.finish() - - def register_canceller(self, cb): - pass - def finish(self): - pass - -class FileDownloader(resource.Resource): + end = self.size + # SDMF: we can write the whole file in one big chunk + consumer.write(contents[start:end]) + return consumer + +def makeMutableDownloadable(n): + d = defer.maybeDeferred(n.get_size_of_best_version) + d.addCallback(MutableDownloadable, n) + return d + +class FileDownloader(rend.Page): # since we override the rendering process (to let the tahoe Downloader # drive things), we must inherit from regular old twisted.web.resource # instead of nevow.rend.Page . Nevow will use adapters to wrap a @@ -368,25 +324,81 @@ class FileDownloader(resource.Resource): # that wrapper would allow us to return a Deferred from render(), which # might could simplify the implementation of WebDownloadTarget. - def __init__(self, filenode, filename, save_to_file): - resource.Resource.__init__(self) + def __init__(self, filenode, filename): + rend.Page.__init__(self) self.filenode = filenode self.filename = filename - self.save_to_file = save_to_file - def render(self, req): + + def renderHTTP(self, ctx): + req = IRequest(ctx) gte = static.getTypeAndEncoding ctype, encoding = gte(self.filename, static.File.contentTypes, static.File.contentEncodings, defaultType="text/plain") + req.setHeader("content-type", ctype) + if encoding: + req.setHeader("content-encoding", encoding) + save_to_filename = None - if self.save_to_file: - save_to_filename = self.filename - wdt = WebDownloadTarget(req, ctype, encoding, save_to_filename) - d = self.filenode.download(wdt) - # exceptions during download are handled by the WebDownloadTarget - d.addErrback(lambda why: None) - return server.NOT_DONE_YET + if boolean_of_arg(get_arg(req, "save", "False")): + # tell the browser to save the file rather display it we don't + # try to encode the filename, instead we echo back the exact same + # bytes we were given in the URL. See the comment in + # FileNodeHandler.render_GET for the sad details. + req.setHeader("content-disposition", + 'attachment; filename="%s"' % self.filename) + + filesize = self.filenode.get_size() + assert isinstance(filesize, (int,long)), filesize + offset, size = 0, None + contentsize = filesize + rangeheader = req.getHeader('range') + if rangeheader: + # adapted from nevow.static.File + bytesrange = rangeheader.split('=') + if bytesrange[0] != 'bytes': + raise WebError("Syntactically invalid http range header!") + start, end = bytesrange[1].split('-') + if start: + offset = int(start) + if end: + size = int(end) - offset + 1 + req.setResponseCode(http.PARTIAL_CONTENT) + req.setHeader('content-range',"bytes %s-%s/%s" % + (str(offset), str(offset+size-1), str(filesize))) + contentsize = size + req.setHeader("content-length", str(contentsize)) + if req.method == "HEAD": + return "" + d = self.filenode.read(req, offset, size) + def _error(f): + if req.startedWriting: + # The content-type is already set, and the response code has + # already been sent, so we can't provide a clean error + # indication. We can emit text (which a browser might + # interpret as something else), and if we sent a Size header, + # they might notice that we've truncated the data. Keep the + # error message small to improve the chances of having our + # error response be shorter than the intended results. + # + # We don't have a lot of options, unfortunately. + req.write("problem during download\n") + else: + # We haven't written anything yet, so we can provide a + # sensible error message. + msg = str(f.type) + msg.replace("\n", "|") + req.setResponseCode(http.GONE, msg) + req.setHeader("content-type", "text/plain") + req.responseHeaders.setRawHeaders("content-encoding", []) + req.responseHeaders.setRawHeaders("content-disposition", []) + # TODO: HTML-formatted exception? + req.write(str(f)) + d.addErrback(_error) + d.addBoth(lambda ign: req.finish()) + return req.deferred + def FileJSONMetadata(ctx, filenode): if filenode.is_readonly():