From f08d18176482508e7b7eee71d35999a183201de3 Mon Sep 17 00:00:00 2001
From: robk-tahoe <robk-tahoe@allmydata.com>
Date: Mon, 20 Oct 2008 16:33:33 -0700
Subject: [PATCH] fuse/blackmatch: added asynchronous (background) file
 download

previously, upon opening a file for reading, the open() call would block
while the entire file was retrieved from tahoe into the cache directory.
This change adds a DownloaderWithReadQueue class, and associated plumbing,
such that an open() will return promptly with the download initiated 'in
the background'.  Subsequent read() operations will block until enough
data has been downloaded to satisfy that request.  This provides a behaviour
similar to streaming, i.e. the client application will be able to read
data from the fuse interface while the remainder of the file is still being
downloaded.
---
 contrib/fuse/impl_c/blackmatch.py | 207 ++++++++++++++++++++++++++----
 1 file changed, 184 insertions(+), 23 deletions(-)

diff --git a/contrib/fuse/impl_c/blackmatch.py b/contrib/fuse/impl_c/blackmatch.py
index c45c0ae8..bde7462b 100644
--- a/contrib/fuse/impl_c/blackmatch.py
+++ b/contrib/fuse/impl_c/blackmatch.py
@@ -5,21 +5,25 @@ from allmydata.uri import CHKFileURI, NewDirectoryURI, LiteralFileURI
 from allmydata.scripts.common_http import do_http as do_http_req
 from allmydata.util.hashutil import tagged_hash
 from allmydata.util.assertutil import precondition
-from allmydata.util import base32, fileutil
+from allmydata.util import base32, fileutil, observer
 from allmydata.scripts.common import get_aliases
 
 from twisted.python import usage
+from twisted.python.failure import Failure
 from twisted.internet.protocol import Factory, Protocol
-from twisted.internet import reactor, defer
+from twisted.internet import reactor, defer, task
+from twisted.web import client
 
 import base64
 import errno
+import heapq
 import sha
 import socket
 import stat
 import subprocess
 import sys
 import os
+import weakref
 #import pprint
 
 # one needs either python-fuse to have been installed in sys.path, or
@@ -150,6 +154,10 @@ class EEXIST(TFSIOError):
     def __init__(self, msg):
         TFSIOError.__init__(self, errno.EEXIST, msg)
 
+class EIO(TFSIOError):
+    def __init__(self, msg):
+        TFSIOError.__init__(self, errno.EIO, msg)
+
 def logargsretexc(meth):
     def inner_logargsretexc(self, *args, **kwargs):
         log("%s(%r, %r)" % (meth, args, kwargs))
@@ -206,12 +214,116 @@ def repr_flags(flags=None):
         ret = ['O_RDONLY']
     return '|'.join(ret)
 
+class DownloaderWithReadQueue(object):
+    def __init__(self):
+        self.read_heap = []
+        self.dest_file_name = None
+        self.running = False
+        self.done_observer = observer.OneShotObserverList()
+
+    def __repr__(self):
+        name = self.dest_file_name is None and '<none>' or os.path.basename(self.dest_file_name)
+        return "<DWRQ(%s)> q(%s)" % (name, len(self.read_heap or []))
+
+    def log(self, msg):
+        log("%r: %s" % (self, msg))
+
+    @logexc
+    def start(self, url, dest_file_name, target_size, interval=0.5):
+        self.log('start(%s, %s, %s)' % (url, dest_file_name, target_size, ))
+        self.dest_file_name = dest_file_name
+        file(self.dest_file_name, 'wb').close() # touch
+        self.target_size = target_size
+        self.log('start()')
+        self.loop = task.LoopingCall(self._check_file_size)
+        self.loop.start(interval)
+        self.running = True
+        d = client.downloadPage(url, self.dest_file_name)
+        d.addCallbacks(self.done, self.fail)
+        return d
+
+    def when_done(self):
+        return self.done_observer.when_fired()
+
+    def get_size(self):
+        if os.path.exists(self.dest_file_name):
+            return os.path.getsize(self.dest_file_name)
+        else:
+            return 0
+
+    @logexc
+    def _read(self, posn, size):
+        #self.log('_read(%s, %s)' % (posn, size))
+        f = file(self.dest_file_name, 'rb')
+        f.seek(posn)
+        data = f.read(size)
+        f.close()
+        return data
+
+    @logexc
+    def read(self, posn, size):
+        self.log('read(%s, %s)' % (posn, size))
+        if self.read_heap is None:
+            raise ValueError('read() called when already shut down')
+        if posn+size > self.target_size:
+            size -= self.target_size - posn
+        fsize = self.get_size()
+        if posn+size < fsize:
+            return defer.succeed(self._read(posn, size))
+        else:
+            d = defer.Deferred()
+            dread = (posn+size, posn, d)
+            heapq.heappush(self.read_heap, dread)
+        return d
+
+    @logexc
+    def _check_file_size(self):
+        #self.log('_check_file_size()')
+        if self.read_heap:
+            try:
+                size = self.get_size()
+                while self.read_heap and self.read_heap[0][0] <= size:
+                    end, start, d = heapq.heappop(self.read_heap)
+                    data = self._read(start, end-start)
+                    d.callback(data)
+            except Exception, e:
+                log_exc()
+                failure = Failure()
+
+    @logexc
+    def fail(self, failure):
+        self.log('fail(%s)' % (failure,))
+        self.running = False
+        if self.loop.running:
+            self.loop.stop()
+        # fail any reads still pending
+        for end, start, d in self.read_heap:
+            reactor.callLater(0, d.errback, failure)
+        self.read_heap = None
+        self.done_observer.fire_if_not_fired(failure)
+        return failure
+
+    @logexc
+    def done(self, result):
+        self.log('done()')
+        self.running = False
+        if self.loop.running:
+            self.loop.stop()
+        precondition(self.get_size() == self.target_size, self.get_size(), self.target_size)
+        self._check_file_size() # process anything left pending in heap
+        precondition(not self.read_heap, self.read_heap, self.target_size, self.get_size())
+        self.read_heap = None
+        self.done_observer.fire_if_not_fired(self)
+        return result
+
+
 class TahoeFuseFile(object):
 
     #def __init__(self, path, flags, *mode):
     def __init__(self, tfs, path, flags, *mode):
         log("TFF: __init__(%r, %r:%s, %r:%s)" % (path, flags, repr_flags(flags), mode, repr_mode(*mode)))
         self.tfs = tfs
+        self.downloader = None
 
         self._path = path # for tahoe put
         try:
@@ -250,8 +362,19 @@ class TahoeFuseFile(object):
                     self.fname = self.fnode.tmp_fname
                     log('TFF: reopening(%s) for reading' % self.fname)
                 else:
-                    log('TFF: fetching file from cache for reading')
-                    self.fname = self.tfs.cache.get_file(uri)
+                    if uri.startswith("URI:LIT") or not self.tfs.async:
+                        log('TFF: synchronously fetching file from cache for reading')
+                        self.fname = self.tfs.cache.get_file(uri)
+                    else:
+                        log('TFF: asynchronously fetching file from cache for reading')
+                        self.fname, self.downloader = self.tfs.cache.async_get_file(uri)
+                        # downloader is None if the cache already contains the file
+                        if self.downloader is not None:
+                            d = self.downloader.when_done()
+                            def download_complete(junk):
+                                # once the download is complete, revert to non-async behaviour
+                                self.downloader = None
+                            d.addCallback(download_complete)
 
                 self.file = os.fdopen(os.open(self.fname, flags, *mode), m)
                 self.fd = self.file.fileno()
@@ -267,8 +390,19 @@ class TahoeFuseFile(object):
     @logexc
     def read(self, size, offset):
         self.log('read(%r, %r)' % (size, offset, ))
-        self.file.seek(offset)
-        return self.file.read(size)
+        if self.downloader:
+            # then we're busy doing an async download
+            # (and hence implicitly, we're in an environment that supports twisted)
+            #self.log('passing read() to %s' % (self.downloader, ))
+            d = self.downloader.read(offset, size)
+            def thunk(failure):
+                raise EIO(str(failure))
+            d.addErrback(thunk)
+            return d
+        else:
+            self.log('servicing read() from %s' % (self.file, ))
+            self.file.seek(offset)
+            return self.file.read(size)
 
     @logexc
     def write(self, buf, offset):
@@ -320,8 +454,13 @@ class TahoeFuseFile(object):
     def fgetattr(self):
         self.log("fgetattr()")
         s = os.fstat(self.fd)
-        self.log("fgetattr() -> %r" % (s,))
-        return stat_to_dict(s)
+        d = stat_to_dict(s)
+        if self.downloader:
+            size = self.downloader.target_size
+            self.log("fgetattr() during async download, cache file: %s, size=%s" % (s, size))
+            d['st_size'] = size
+        self.log("fgetattr() -> %r" % (d,))
+        return d
 
     @logexc
     def ftruncate(self, len):
@@ -945,10 +1084,11 @@ class File(object):
 
 class TFS(object):
     def __init__(self, nodedir, nodeurl, root_uri, 
-                       cache_validity_period=DEFAULT_DIRECTORY_VALIDITY):
+                       cache_validity_period=DEFAULT_DIRECTORY_VALIDITY, async=False):
         self.cache_validity = cache_validity_period
         self.nodeurl = nodeurl
         self.root_uri = root_uri
+        self.async = async
         self.dirs = {}
 
         cachedir = os.path.expanduser(os.path.join(nodedir, '_cache'))
@@ -1090,6 +1230,7 @@ class FileCache(object):
         self.tmpdir = os.path.join(self.cachedir, 'tmp')
         if not os.path.exists(self.tmpdir):
             os.makedirs(self.tmpdir)
+        self.downloaders = weakref.WeakValueDictionary()
 
     def log(self, msg):
         log("<FC> %s" % (msg, ))
@@ -1099,7 +1240,11 @@ class FileCache(object):
         if uri.startswith("URI:LIT"):
             return self.get_literal(uri)
         else:
-            return self.get_chk(uri)
+            return self.get_chk(uri, async=False)
+
+    def async_get_file(self, uri):
+        self.log('get_file(%s)' % (uri,))
+        return self.get_chk(uri, async=True)
 
     def get_literal(self, uri):
         h = sha.new(uri).digest()
@@ -1112,7 +1257,7 @@ class FileCache(object):
         fh.close()
         return fname
 
-    def get_chk(self, uri):
+    def get_chk(self, uri, async=False):
         u = CHKFileURI.init_from_string(str(uri))
         storage_index = u.storage_index
         size = u.size
@@ -1120,20 +1265,36 @@ class FileCache(object):
         if os.path.exists(fname):
             fsize = os.path.getsize(fname)
             if fsize == size:
-                return fname
+                if async:
+                    return fname, None
+                else:
+                    return fname
             else:
                 self.log('warning file "%s" is too short %s < %s' % (fname, fsize, size))
         self.log('downloading file %s (%s)' % (fname, size, ))
-        fh = open(fname, 'wb')
         url = "%suri/%s" % (self.nodeurl, uri)
-        download = urllib.urlopen(''.join([ self.nodeurl, "uri/", uri ]))
-        while True:
-            chunk = download.read(4096)
-            if not chunk:
-                break
-            fh.write(chunk)
-        fh.close()
-        return fname
+        if async:
+            if fname in self.downloaders and self.downloaders[fname].running:
+                downloader = self.downloaders[fname]
+            else:
+                downloader = DownloaderWithReadQueue()
+                self.downloaders[fname] = downloader
+                d = downloader.start(url, fname, target_size=u.size)
+                def clear_downloader(result, fname):
+                    self.log('clearing %s from downloaders: %r' % (fname, result))
+                    self.downloaders.pop(fname, None)
+                d.addBoth(clear_downloader, fname)
+            return fname, downloader
+        else:
+            fh = open(fname, 'wb')
+            download = urllib.urlopen(url)
+            while True:
+                chunk = download.read(4096)
+                if not chunk:
+                    break
+                fh.write(chunk)
+            fh.close()
+            return fname
 
     def tmp_file(self, id):
         fname = os.path.join(self.tmpdir, base64.b32encode(id).lower())
@@ -1483,7 +1644,7 @@ def main(argv):
         log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n')
 
         cache_timeout = float(config['cache-timeout'])
-        tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
+        tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=False)
         #print tfs.pprint()
 
         # make tfs instance accesible to print_tree() for dbg
@@ -1504,7 +1665,7 @@ def main(argv):
 
         try:
             cache_timeout = float(config['cache-timeout'])
-            tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
+            tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=True)
             #print tfs.pprint()
 
             # make tfs instance accesible to print_tree() for dbg
-- 
2.45.2