From f08d18176482508e7b7eee71d35999a183201de3 Mon Sep 17 00:00:00 2001 From: robk-tahoe Date: Mon, 20 Oct 2008 16:33:33 -0700 Subject: [PATCH] fuse/blackmatch: added asynchronous (background) file download previously, upon opening a file for reading, the open() call would block while the entire file was retrieved from tahoe into the cache directory. This change adds a DownloaderWithReadQueue class, and associated plumbing, such that an open() will return promptly with the download initiated 'in the background'. Subsequent read() operations will block until enough data has been downloaded to satisfy that request. This provides a behaviour similar to streaming, i.e. the client application will be able to read data from the fuse interface while the remainder of the file is still being downloaded. --- contrib/fuse/impl_c/blackmatch.py | 207 ++++++++++++++++++++++++++---- 1 file changed, 184 insertions(+), 23 deletions(-) diff --git a/contrib/fuse/impl_c/blackmatch.py b/contrib/fuse/impl_c/blackmatch.py index c45c0ae8..bde7462b 100644 --- a/contrib/fuse/impl_c/blackmatch.py +++ b/contrib/fuse/impl_c/blackmatch.py @@ -5,21 +5,25 @@ from allmydata.uri import CHKFileURI, NewDirectoryURI, LiteralFileURI from allmydata.scripts.common_http import do_http as do_http_req from allmydata.util.hashutil import tagged_hash from allmydata.util.assertutil import precondition -from allmydata.util import base32, fileutil +from allmydata.util import base32, fileutil, observer from allmydata.scripts.common import get_aliases from twisted.python import usage +from twisted.python.failure import Failure from twisted.internet.protocol import Factory, Protocol -from twisted.internet import reactor, defer +from twisted.internet import reactor, defer, task +from twisted.web import client import base64 import errno +import heapq import sha import socket import stat import subprocess import sys import os +import weakref #import pprint # one needs either python-fuse to have been installed in sys.path, or @@ -150,6 +154,10 @@ class EEXIST(TFSIOError): def __init__(self, msg): TFSIOError.__init__(self, errno.EEXIST, msg) +class EIO(TFSIOError): + def __init__(self, msg): + TFSIOError.__init__(self, errno.EIO, msg) + def logargsretexc(meth): def inner_logargsretexc(self, *args, **kwargs): log("%s(%r, %r)" % (meth, args, kwargs)) @@ -206,12 +214,116 @@ def repr_flags(flags=None): ret = ['O_RDONLY'] return '|'.join(ret) +class DownloaderWithReadQueue(object): + def __init__(self): + self.read_heap = [] + self.dest_file_name = None + self.running = False + self.done_observer = observer.OneShotObserverList() + + def __repr__(self): + name = self.dest_file_name is None and '' or os.path.basename(self.dest_file_name) + return " q(%s)" % (name, len(self.read_heap or [])) + + def log(self, msg): + log("%r: %s" % (self, msg)) + + @logexc + def start(self, url, dest_file_name, target_size, interval=0.5): + self.log('start(%s, %s, %s)' % (url, dest_file_name, target_size, )) + self.dest_file_name = dest_file_name + file(self.dest_file_name, 'wb').close() # touch + self.target_size = target_size + self.log('start()') + self.loop = task.LoopingCall(self._check_file_size) + self.loop.start(interval) + self.running = True + d = client.downloadPage(url, self.dest_file_name) + d.addCallbacks(self.done, self.fail) + return d + + def when_done(self): + return self.done_observer.when_fired() + + def get_size(self): + if os.path.exists(self.dest_file_name): + return os.path.getsize(self.dest_file_name) + else: + return 0 + + @logexc + def _read(self, posn, size): + #self.log('_read(%s, %s)' % (posn, size)) + f = file(self.dest_file_name, 'rb') + f.seek(posn) + data = f.read(size) + f.close() + return data + + @logexc + def read(self, posn, size): + self.log('read(%s, %s)' % (posn, size)) + if self.read_heap is None: + raise ValueError('read() called when already shut down') + if posn+size > self.target_size: + size -= self.target_size - posn + fsize = self.get_size() + if posn+size < fsize: + return defer.succeed(self._read(posn, size)) + else: + d = defer.Deferred() + dread = (posn+size, posn, d) + heapq.heappush(self.read_heap, dread) + return d + + @logexc + def _check_file_size(self): + #self.log('_check_file_size()') + if self.read_heap: + try: + size = self.get_size() + while self.read_heap and self.read_heap[0][0] <= size: + end, start, d = heapq.heappop(self.read_heap) + data = self._read(start, end-start) + d.callback(data) + except Exception, e: + log_exc() + failure = Failure() + + @logexc + def fail(self, failure): + self.log('fail(%s)' % (failure,)) + self.running = False + if self.loop.running: + self.loop.stop() + # fail any reads still pending + for end, start, d in self.read_heap: + reactor.callLater(0, d.errback, failure) + self.read_heap = None + self.done_observer.fire_if_not_fired(failure) + return failure + + @logexc + def done(self, result): + self.log('done()') + self.running = False + if self.loop.running: + self.loop.stop() + precondition(self.get_size() == self.target_size, self.get_size(), self.target_size) + self._check_file_size() # process anything left pending in heap + precondition(not self.read_heap, self.read_heap, self.target_size, self.get_size()) + self.read_heap = None + self.done_observer.fire_if_not_fired(self) + return result + + class TahoeFuseFile(object): #def __init__(self, path, flags, *mode): def __init__(self, tfs, path, flags, *mode): log("TFF: __init__(%r, %r:%s, %r:%s)" % (path, flags, repr_flags(flags), mode, repr_mode(*mode))) self.tfs = tfs + self.downloader = None self._path = path # for tahoe put try: @@ -250,8 +362,19 @@ class TahoeFuseFile(object): self.fname = self.fnode.tmp_fname log('TFF: reopening(%s) for reading' % self.fname) else: - log('TFF: fetching file from cache for reading') - self.fname = self.tfs.cache.get_file(uri) + if uri.startswith("URI:LIT") or not self.tfs.async: + log('TFF: synchronously fetching file from cache for reading') + self.fname = self.tfs.cache.get_file(uri) + else: + log('TFF: asynchronously fetching file from cache for reading') + self.fname, self.downloader = self.tfs.cache.async_get_file(uri) + # downloader is None if the cache already contains the file + if self.downloader is not None: + d = self.downloader.when_done() + def download_complete(junk): + # once the download is complete, revert to non-async behaviour + self.downloader = None + d.addCallback(download_complete) self.file = os.fdopen(os.open(self.fname, flags, *mode), m) self.fd = self.file.fileno() @@ -267,8 +390,19 @@ class TahoeFuseFile(object): @logexc def read(self, size, offset): self.log('read(%r, %r)' % (size, offset, )) - self.file.seek(offset) - return self.file.read(size) + if self.downloader: + # then we're busy doing an async download + # (and hence implicitly, we're in an environment that supports twisted) + #self.log('passing read() to %s' % (self.downloader, )) + d = self.downloader.read(offset, size) + def thunk(failure): + raise EIO(str(failure)) + d.addErrback(thunk) + return d + else: + self.log('servicing read() from %s' % (self.file, )) + self.file.seek(offset) + return self.file.read(size) @logexc def write(self, buf, offset): @@ -320,8 +454,13 @@ class TahoeFuseFile(object): def fgetattr(self): self.log("fgetattr()") s = os.fstat(self.fd) - self.log("fgetattr() -> %r" % (s,)) - return stat_to_dict(s) + d = stat_to_dict(s) + if self.downloader: + size = self.downloader.target_size + self.log("fgetattr() during async download, cache file: %s, size=%s" % (s, size)) + d['st_size'] = size + self.log("fgetattr() -> %r" % (d,)) + return d @logexc def ftruncate(self, len): @@ -945,10 +1084,11 @@ class File(object): class TFS(object): def __init__(self, nodedir, nodeurl, root_uri, - cache_validity_period=DEFAULT_DIRECTORY_VALIDITY): + cache_validity_period=DEFAULT_DIRECTORY_VALIDITY, async=False): self.cache_validity = cache_validity_period self.nodeurl = nodeurl self.root_uri = root_uri + self.async = async self.dirs = {} cachedir = os.path.expanduser(os.path.join(nodedir, '_cache')) @@ -1090,6 +1230,7 @@ class FileCache(object): self.tmpdir = os.path.join(self.cachedir, 'tmp') if not os.path.exists(self.tmpdir): os.makedirs(self.tmpdir) + self.downloaders = weakref.WeakValueDictionary() def log(self, msg): log(" %s" % (msg, )) @@ -1099,7 +1240,11 @@ class FileCache(object): if uri.startswith("URI:LIT"): return self.get_literal(uri) else: - return self.get_chk(uri) + return self.get_chk(uri, async=False) + + def async_get_file(self, uri): + self.log('get_file(%s)' % (uri,)) + return self.get_chk(uri, async=True) def get_literal(self, uri): h = sha.new(uri).digest() @@ -1112,7 +1257,7 @@ class FileCache(object): fh.close() return fname - def get_chk(self, uri): + def get_chk(self, uri, async=False): u = CHKFileURI.init_from_string(str(uri)) storage_index = u.storage_index size = u.size @@ -1120,20 +1265,36 @@ class FileCache(object): if os.path.exists(fname): fsize = os.path.getsize(fname) if fsize == size: - return fname + if async: + return fname, None + else: + return fname else: self.log('warning file "%s" is too short %s < %s' % (fname, fsize, size)) self.log('downloading file %s (%s)' % (fname, size, )) - fh = open(fname, 'wb') url = "%suri/%s" % (self.nodeurl, uri) - download = urllib.urlopen(''.join([ self.nodeurl, "uri/", uri ])) - while True: - chunk = download.read(4096) - if not chunk: - break - fh.write(chunk) - fh.close() - return fname + if async: + if fname in self.downloaders and self.downloaders[fname].running: + downloader = self.downloaders[fname] + else: + downloader = DownloaderWithReadQueue() + self.downloaders[fname] = downloader + d = downloader.start(url, fname, target_size=u.size) + def clear_downloader(result, fname): + self.log('clearing %s from downloaders: %r' % (fname, result)) + self.downloaders.pop(fname, None) + d.addBoth(clear_downloader, fname) + return fname, downloader + else: + fh = open(fname, 'wb') + download = urllib.urlopen(url) + while True: + chunk = download.read(4096) + if not chunk: + break + fh.write(chunk) + fh.close() + return fname def tmp_file(self, id): fname = os.path.join(self.tmpdir, base64.b32encode(id).lower()) @@ -1483,7 +1644,7 @@ def main(argv): log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n') cache_timeout = float(config['cache-timeout']) - tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout) + tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=False) #print tfs.pprint() # make tfs instance accesible to print_tree() for dbg @@ -1504,7 +1665,7 @@ def main(argv): try: cache_timeout = float(config['cache-timeout']) - tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout) + tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=True) #print tfs.pprint() # make tfs instance accesible to print_tree() for dbg -- 2.45.2