from allmydata.scripts.common_http import do_http as do_http_req
from allmydata.util.hashutil import tagged_hash
from allmydata.util.assertutil import precondition
-from allmydata.util import base32, fileutil
+from allmydata.util import base32, fileutil, observer
from allmydata.scripts.common import get_aliases
from twisted.python import usage
+from twisted.python.failure import Failure
from twisted.internet.protocol import Factory, Protocol
-from twisted.internet import reactor, defer
+from twisted.internet import reactor, defer, task
+from twisted.web import client
import base64
import errno
+import heapq
import sha
import socket
import stat
import subprocess
import sys
import os
+import weakref
#import pprint
# one needs either python-fuse to have been installed in sys.path, or
def __init__(self, msg):
TFSIOError.__init__(self, errno.EEXIST, msg)
+class EIO(TFSIOError):
+ def __init__(self, msg):
+ TFSIOError.__init__(self, errno.EIO, msg)
+
def logargsretexc(meth):
def inner_logargsretexc(self, *args, **kwargs):
log("%s(%r, %r)" % (meth, args, kwargs))
ret = ['O_RDONLY']
return '|'.join(ret)
+class DownloaderWithReadQueue(object):
+ def __init__(self):
+ self.read_heap = []
+ self.dest_file_name = None
+ self.running = False
+ self.done_observer = observer.OneShotObserverList()
+
+ def __repr__(self):
+ name = self.dest_file_name is None and '<none>' or os.path.basename(self.dest_file_name)
+ return "<DWRQ(%s)> q(%s)" % (name, len(self.read_heap or []))
+
+ def log(self, msg):
+ log("%r: %s" % (self, msg))
+
+ @logexc
+ def start(self, url, dest_file_name, target_size, interval=0.5):
+ self.log('start(%s, %s, %s)' % (url, dest_file_name, target_size, ))
+ self.dest_file_name = dest_file_name
+ file(self.dest_file_name, 'wb').close() # touch
+ self.target_size = target_size
+ self.log('start()')
+ self.loop = task.LoopingCall(self._check_file_size)
+ self.loop.start(interval)
+ self.running = True
+ d = client.downloadPage(url, self.dest_file_name)
+ d.addCallbacks(self.done, self.fail)
+ return d
+
+ def when_done(self):
+ return self.done_observer.when_fired()
+
+ def get_size(self):
+ if os.path.exists(self.dest_file_name):
+ return os.path.getsize(self.dest_file_name)
+ else:
+ return 0
+
+ @logexc
+ def _read(self, posn, size):
+ #self.log('_read(%s, %s)' % (posn, size))
+ f = file(self.dest_file_name, 'rb')
+ f.seek(posn)
+ data = f.read(size)
+ f.close()
+ return data
+
+ @logexc
+ def read(self, posn, size):
+ self.log('read(%s, %s)' % (posn, size))
+ if self.read_heap is None:
+ raise ValueError('read() called when already shut down')
+ if posn+size > self.target_size:
+ size -= self.target_size - posn
+ fsize = self.get_size()
+ if posn+size < fsize:
+ return defer.succeed(self._read(posn, size))
+ else:
+ d = defer.Deferred()
+ dread = (posn+size, posn, d)
+ heapq.heappush(self.read_heap, dread)
+ return d
+
+ @logexc
+ def _check_file_size(self):
+ #self.log('_check_file_size()')
+ if self.read_heap:
+ try:
+ size = self.get_size()
+ while self.read_heap and self.read_heap[0][0] <= size:
+ end, start, d = heapq.heappop(self.read_heap)
+ data = self._read(start, end-start)
+ d.callback(data)
+ except Exception, e:
+ log_exc()
+ failure = Failure()
+
+ @logexc
+ def fail(self, failure):
+ self.log('fail(%s)' % (failure,))
+ self.running = False
+ if self.loop.running:
+ self.loop.stop()
+ # fail any reads still pending
+ for end, start, d in self.read_heap:
+ reactor.callLater(0, d.errback, failure)
+ self.read_heap = None
+ self.done_observer.fire_if_not_fired(failure)
+ return failure
+
+ @logexc
+ def done(self, result):
+ self.log('done()')
+ self.running = False
+ if self.loop.running:
+ self.loop.stop()
+ precondition(self.get_size() == self.target_size, self.get_size(), self.target_size)
+ self._check_file_size() # process anything left pending in heap
+ precondition(not self.read_heap, self.read_heap, self.target_size, self.get_size())
+ self.read_heap = None
+ self.done_observer.fire_if_not_fired(self)
+ return result
+
+
class TahoeFuseFile(object):
#def __init__(self, path, flags, *mode):
def __init__(self, tfs, path, flags, *mode):
log("TFF: __init__(%r, %r:%s, %r:%s)" % (path, flags, repr_flags(flags), mode, repr_mode(*mode)))
self.tfs = tfs
+ self.downloader = None
self._path = path # for tahoe put
try:
self.fname = self.fnode.tmp_fname
log('TFF: reopening(%s) for reading' % self.fname)
else:
- log('TFF: fetching file from cache for reading')
- self.fname = self.tfs.cache.get_file(uri)
+ if uri.startswith("URI:LIT") or not self.tfs.async:
+ log('TFF: synchronously fetching file from cache for reading')
+ self.fname = self.tfs.cache.get_file(uri)
+ else:
+ log('TFF: asynchronously fetching file from cache for reading')
+ self.fname, self.downloader = self.tfs.cache.async_get_file(uri)
+ # downloader is None if the cache already contains the file
+ if self.downloader is not None:
+ d = self.downloader.when_done()
+ def download_complete(junk):
+ # once the download is complete, revert to non-async behaviour
+ self.downloader = None
+ d.addCallback(download_complete)
self.file = os.fdopen(os.open(self.fname, flags, *mode), m)
self.fd = self.file.fileno()
@logexc
def read(self, size, offset):
self.log('read(%r, %r)' % (size, offset, ))
- self.file.seek(offset)
- return self.file.read(size)
+ if self.downloader:
+ # then we're busy doing an async download
+ # (and hence implicitly, we're in an environment that supports twisted)
+ #self.log('passing read() to %s' % (self.downloader, ))
+ d = self.downloader.read(offset, size)
+ def thunk(failure):
+ raise EIO(str(failure))
+ d.addErrback(thunk)
+ return d
+ else:
+ self.log('servicing read() from %s' % (self.file, ))
+ self.file.seek(offset)
+ return self.file.read(size)
@logexc
def write(self, buf, offset):
def fgetattr(self):
self.log("fgetattr()")
s = os.fstat(self.fd)
- self.log("fgetattr() -> %r" % (s,))
- return stat_to_dict(s)
+ d = stat_to_dict(s)
+ if self.downloader:
+ size = self.downloader.target_size
+ self.log("fgetattr() during async download, cache file: %s, size=%s" % (s, size))
+ d['st_size'] = size
+ self.log("fgetattr() -> %r" % (d,))
+ return d
@logexc
def ftruncate(self, len):
class TFS(object):
def __init__(self, nodedir, nodeurl, root_uri,
- cache_validity_period=DEFAULT_DIRECTORY_VALIDITY):
+ cache_validity_period=DEFAULT_DIRECTORY_VALIDITY, async=False):
self.cache_validity = cache_validity_period
self.nodeurl = nodeurl
self.root_uri = root_uri
+ self.async = async
self.dirs = {}
cachedir = os.path.expanduser(os.path.join(nodedir, '_cache'))
self.tmpdir = os.path.join(self.cachedir, 'tmp')
if not os.path.exists(self.tmpdir):
os.makedirs(self.tmpdir)
+ self.downloaders = weakref.WeakValueDictionary()
def log(self, msg):
log("<FC> %s" % (msg, ))
if uri.startswith("URI:LIT"):
return self.get_literal(uri)
else:
- return self.get_chk(uri)
+ return self.get_chk(uri, async=False)
+
+ def async_get_file(self, uri):
+ self.log('get_file(%s)' % (uri,))
+ return self.get_chk(uri, async=True)
def get_literal(self, uri):
h = sha.new(uri).digest()
fh.close()
return fname
- def get_chk(self, uri):
+ def get_chk(self, uri, async=False):
u = CHKFileURI.init_from_string(str(uri))
storage_index = u.storage_index
size = u.size
if os.path.exists(fname):
fsize = os.path.getsize(fname)
if fsize == size:
- return fname
+ if async:
+ return fname, None
+ else:
+ return fname
else:
self.log('warning file "%s" is too short %s < %s' % (fname, fsize, size))
self.log('downloading file %s (%s)' % (fname, size, ))
- fh = open(fname, 'wb')
url = "%suri/%s" % (self.nodeurl, uri)
- download = urllib.urlopen(''.join([ self.nodeurl, "uri/", uri ]))
- while True:
- chunk = download.read(4096)
- if not chunk:
- break
- fh.write(chunk)
- fh.close()
- return fname
+ if async:
+ if fname in self.downloaders and self.downloaders[fname].running:
+ downloader = self.downloaders[fname]
+ else:
+ downloader = DownloaderWithReadQueue()
+ self.downloaders[fname] = downloader
+ d = downloader.start(url, fname, target_size=u.size)
+ def clear_downloader(result, fname):
+ self.log('clearing %s from downloaders: %r' % (fname, result))
+ self.downloaders.pop(fname, None)
+ d.addBoth(clear_downloader, fname)
+ return fname, downloader
+ else:
+ fh = open(fname, 'wb')
+ download = urllib.urlopen(url)
+ while True:
+ chunk = download.read(4096)
+ if not chunk:
+ break
+ fh.write(chunk)
+ fh.close()
+ return fname
def tmp_file(self, id):
fname = os.path.join(self.tmpdir, base64.b32encode(id).lower())
log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n')
cache_timeout = float(config['cache-timeout'])
- tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
+ tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=False)
#print tfs.pprint()
# make tfs instance accesible to print_tree() for dbg
try:
cache_timeout = float(config['cache-timeout'])
- tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
+ tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=True)
#print tfs.pprint()
# make tfs instance accesible to print_tree() for dbg