-#!/usr/bin/env python
-
-#-----------------------------------------------------------------------------------------------
-from allmydata.uri import CHKFileURI, DirectoryURI, LiteralFileURI, is_literal_file_uri
-from allmydata.scripts.common_http import do_http as do_http_req
-from allmydata.util.hashutil import tagged_hash
-from allmydata.util.assertutil import precondition
-from allmydata.util import base32, fileutil, observer
-from allmydata.scripts.common import get_aliases
-
-from twisted.python import usage
-from twisted.python.failure import Failure
-from twisted.internet.protocol import Factory, Protocol
-from twisted.internet import reactor, defer, task
-from twisted.web import client
-
-import base64
-import errno
-import heapq
-import sha
-import socket
-import stat
-import subprocess
-import sys
-import os
-import weakref
-#import pprint
-
-# one needs either python-fuse to have been installed in sys.path, or
-# suitable affordances to be made in the build or runtime environment
-import fuse
-
-import time
-import traceback
-import simplejson
-import urllib
-
-VERSIONSTR="0.7"
-
-USAGE = 'usage: tahoe fuse [dir_cap_name] [fuse_options] mountpoint'
-DEFAULT_DIRECTORY_VALIDITY=26
-
-if not hasattr(fuse, '__version__'):
- raise RuntimeError, \
- "your fuse-py doesn't know of fuse.__version__, probably it's too old."
-
-fuse.fuse_python_api = (0, 2)
-fuse.feature_assert('stateful_files', 'has_init')
-
-class TahoeFuseOptions(usage.Options):
- optParameters = [
- ["node-directory", None, "~/.tahoe",
- "Look here to find out which Tahoe node should be used for all "
- "operations. The directory should either contain a full Tahoe node, "
- "or a file named node.url which points to some other Tahoe node. "
- "It should also contain a file named private/aliases which contains "
- "the mapping from alias name to root dirnode URI."
- ],
- ["node-url", None, None,
- "URL of the tahoe node to use, a URL like \"http://127.0.0.1:3456\". "
- "This overrides the URL found in the --node-directory ."],
- ["alias", None, None,
- "Which alias should be mounted."],
- ["root-uri", None, None,
- "Which root directory uri should be mounted."],
- ["cache-timeout", None, 20,
- "Time, in seconds, to cache directory data."],
- ]
- optFlags = [
- ['no-split', None,
- 'run stand-alone; no splitting into client and server'],
- ['server', None,
- 'server mode (should not be used by end users)'],
- ['server-shutdown', None,
- 'shutdown server (should not be used by end users)'],
- ]
-
- def __init__(self):
- usage.Options.__init__(self)
- self.fuse_options = []
- self.mountpoint = None
-
- def opt_option(self, fuse_option):
- """
- Pass mount options directly to fuse. See below.
- """
- self.fuse_options.append(fuse_option)
-
- opt_o = opt_option
-
- def parseArgs(self, mountpoint=''):
- self.mountpoint = mountpoint
-
- def getSynopsis(self):
- return "%s [options] mountpoint" % (os.path.basename(sys.argv[0]),)
-
-logfile = file('tfuse.log', 'ab')
-
-def reopen_logfile(fname):
- global logfile
- log('switching to %s' % (fname,))
- logfile.close()
- logfile = file(fname, 'ab')
-
-def log(msg):
- logfile.write("%s: %s\n" % (time.asctime(), msg))
- #time.sleep(0.1)
- logfile.flush()
-
-fuse.flog = log
-
-def unicode_to_utf8_or_str(u):
- if isinstance(u, unicode):
- return u.encode('utf-8')
- else:
- precondition(isinstance(u, str), repr(u))
- return u
-
-def do_http(method, url, body=''):
- resp = do_http_req(method, url, body)
- log('do_http(%s, %s) -> %s, %s' % (method, url, resp.status, resp.reason))
- if resp.status not in (200, 201):
- raise RuntimeError('http response (%s, %s)' % (resp.status, resp.reason))
- else:
- return resp.read()
-
-def flag2mode(flags):
- log('flag2mode(%r)' % (flags,))
- #md = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
- md = {os.O_RDONLY: 'rb', os.O_WRONLY: 'wb', os.O_RDWR: 'w+b'}
- m = md[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
-
- if flags & os.O_APPEND:
- m = m.replace('w', 'a', 1)
-
- return m
-
-class TFSIOError(IOError):
- pass
-
-class ENOENT(TFSIOError):
- def __init__(self, msg):
- TFSIOError.__init__(self, errno.ENOENT, msg)
-
-class EINVAL(TFSIOError):
- def __init__(self, msg):
- TFSIOError.__init__(self, errno.EINVAL, msg)
-
-class EACCESS(TFSIOError):
- def __init__(self, msg):
- TFSIOError.__init__(self, errno.EACCESS, msg)
-
-class EEXIST(TFSIOError):
- def __init__(self, msg):
- TFSIOError.__init__(self, errno.EEXIST, msg)
-
-class EIO(TFSIOError):
- def __init__(self, msg):
- TFSIOError.__init__(self, errno.EIO, msg)
-
-def logargsretexc(meth):
- def inner_logargsretexc(self, *args, **kwargs):
- log("%s(%r, %r)" % (meth, args, kwargs))
- try:
- ret = meth(self, *args, **kwargs)
- except:
- log('exception:\n%s' % (traceback.format_exc(),))
- raise
- log("ret: %r" % (ret, ))
- return ret
- inner_logargsretexc.__name__ = '<logwrap(%s)>' % (meth,)
- return inner_logargsretexc
-
-def logexc(meth):
- def inner_logexc(self, *args, **kwargs):
- try:
- ret = meth(self, *args, **kwargs)
- except TFSIOError, tie:
- log('error: %s' % (tie,))
- raise
- except:
- log('exception:\n%s' % (traceback.format_exc(),))
- raise
- return ret
- inner_logexc.__name__ = '<logwrap(%s)>' % (meth,)
- return inner_logexc
-
-def log_exc():
- log('exception:\n%s' % (traceback.format_exc(),))
-
-def repr_mode(mode=None):
- if mode is None:
- return 'none'
- fields = ['S_ENFMT', 'S_IFBLK', 'S_IFCHR', 'S_IFDIR', 'S_IFIFO', 'S_IFLNK', 'S_IFREG', 'S_IFSOCK', 'S_IRGRP', 'S_IROTH', 'S_IRUSR', 'S_IRWXG', 'S_IRWXO', 'S_IRWXU', 'S_ISGID', 'S_ISUID', 'S_ISVTX', 'S_IWGRP', 'S_IWOTH', 'S_IWUSR', 'S_IXGRP', 'S_IXOTH', 'S_IXUSR']
- ret = []
- for field in fields:
- fval = getattr(stat, field)
- if (mode & fval) == fval:
- ret.append(field)
- return '|'.join(ret)
-
-def repr_flags(flags=None):
- if flags is None:
- return 'none'
- fields = [ 'O_APPEND', 'O_CREAT', 'O_DIRECT', 'O_DIRECTORY', 'O_EXCL', 'O_EXLOCK',
- 'O_LARGEFILE', 'O_NDELAY', 'O_NOCTTY', 'O_NOFOLLOW', 'O_NONBLOCK', 'O_RDWR',
- 'O_SHLOCK', 'O_SYNC', 'O_TRUNC', 'O_WRONLY', ]
- ret = []
- for field in fields:
- fval = getattr(os, field, None)
- if fval is not None and (flags & fval) == fval:
- ret.append(field)
- if not ret:
- ret = ['O_RDONLY']
- return '|'.join(ret)
-
-class DownloaderWithReadQueue(object):
- def __init__(self):
- self.read_heap = []
- self.dest_file_name = None
- self.running = False
- self.done_observer = observer.OneShotObserverList()
-
- def __repr__(self):
- name = self.dest_file_name is None and '<none>' or os.path.basename(self.dest_file_name)
- return "<DWRQ(%s)> q(%s)" % (name, len(self.read_heap or []))
-
- def log(self, msg):
- log("%r: %s" % (self, msg))
-
- @logexc
- def start(self, url, dest_file_name, target_size, interval=0.5):
- self.log('start(%s, %s, %s)' % (url, dest_file_name, target_size, ))
- self.dest_file_name = dest_file_name
- file(self.dest_file_name, 'wb').close() # touch
- self.target_size = target_size
- self.log('start()')
- self.loop = task.LoopingCall(self._check_file_size)
- self.loop.start(interval)
- self.running = True
- d = client.downloadPage(url, self.dest_file_name)
- d.addCallbacks(self.done, self.fail)
- return d
-
- def when_done(self):
- return self.done_observer.when_fired()
-
- def get_size(self):
- if os.path.exists(self.dest_file_name):
- return os.path.getsize(self.dest_file_name)
- else:
- return 0
-
- @logexc
- def _read(self, posn, size):
- #self.log('_read(%s, %s)' % (posn, size))
- f = file(self.dest_file_name, 'rb')
- f.seek(posn)
- data = f.read(size)
- f.close()
- return data
-
- @logexc
- def read(self, posn, size):
- self.log('read(%s, %s)' % (posn, size))
- if self.read_heap is None:
- raise ValueError('read() called when already shut down')
- if posn+size > self.target_size:
- size -= self.target_size - posn
- fsize = self.get_size()
- if posn+size < fsize:
- return defer.succeed(self._read(posn, size))
- else:
- d = defer.Deferred()
- dread = (posn+size, posn, d)
- heapq.heappush(self.read_heap, dread)
- return d
-
- @logexc
- def _check_file_size(self):
- #self.log('_check_file_size()')
- if self.read_heap:
- try:
- size = self.get_size()
- while self.read_heap and self.read_heap[0][0] <= size:
- end, start, d = heapq.heappop(self.read_heap)
- data = self._read(start, end-start)
- d.callback(data)
- except Exception, e:
- log_exc()
- failure = Failure()
-
- @logexc
- def fail(self, failure):
- self.log('fail(%s)' % (failure,))
- self.running = False
- if self.loop.running:
- self.loop.stop()
- # fail any reads still pending
- for end, start, d in self.read_heap:
- reactor.callLater(0, d.errback, failure)
- self.read_heap = None
- self.done_observer.fire_if_not_fired(failure)
- return failure
-
- @logexc
- def done(self, result):
- self.log('done()')
- self.running = False
- if self.loop.running:
- self.loop.stop()
- precondition(self.get_size() == self.target_size, self.get_size(), self.target_size)
- self._check_file_size() # process anything left pending in heap
- precondition(not self.read_heap, self.read_heap, self.target_size, self.get_size())
- self.read_heap = None
- self.done_observer.fire_if_not_fired(self)
- return result
-
-
-class TahoeFuseFile(object):
-
- #def __init__(self, path, flags, *mode):
- def __init__(self, tfs, path, flags, *mode):
- log("TFF: __init__(%r, %r:%s, %r:%s)" % (path, flags, repr_flags(flags), mode, repr_mode(*mode)))
- self.tfs = tfs
- self.downloader = None
-
- self._path = path # for tahoe put
- try:
- self.parent, self.name, self.fnode = self.tfs.get_parent_name_and_child(path)
- m = flag2mode(flags)
- log('TFF: flags2(mode) -> %s' % (m,))
- if m[0] in 'wa':
- # write
- self.fname = self.tfs.cache.tmp_file(os.urandom(20))
- if self.fnode is None:
- log('TFF: [%s] open() for write: no file node, creating new File %s' % (self.name, self.fname, ))
- self.fnode = File(0, LiteralFileURI.BASE_STRING)
- self.fnode.tmp_fname = self.fname # XXX kill this
- self.parent.add_child(self.name, self.fnode, {})
- elif hasattr(self.fnode, 'tmp_fname'):
- self.fname = self.fnode.tmp_fname
- log('TFF: [%s] open() for write: existing file node lists %s' % (self.name, self.fname, ))
- else:
- log('TFF: [%s] open() for write: existing file node lists no tmp_file, using new %s' % (self.name, self.fname, ))
- if mode != (0600,):
- log('TFF: [%s] changing mode %s(%s) to 0600' % (self.name, repr_mode(*mode), mode))
- mode = (0600,)
- log('TFF: [%s] opening(%s) with flags %s(%s), mode %s(%s)' % (self.name, self.fname, repr_flags(flags|os.O_CREAT), flags|os.O_CREAT, repr_mode(*mode), mode))
- #self.file = os.fdopen(os.open(self.fname, flags|os.O_CREAT, *mode), m)
- self.file = os.fdopen(os.open(self.fname, flags|os.O_CREAT, *mode), m)
- self.fd = self.file.fileno()
- log('TFF: opened(%s) for write' % self.fname)
- self.open_for_write = True
- else:
- # read
- assert self.fnode is not None
- uri = self.fnode.get_uri()
-
- # XXX make this go away
- if hasattr(self.fnode, 'tmp_fname'):
- self.fname = self.fnode.tmp_fname
- log('TFF: reopening(%s) for reading' % self.fname)
- else:
- if is_literal_file_uri(uri) or not self.tfs.async:
- log('TFF: synchronously fetching file from cache for reading')
- self.fname = self.tfs.cache.get_file(uri)
- else:
- log('TFF: asynchronously fetching file from cache for reading')
- self.fname, self.downloader = self.tfs.cache.async_get_file(uri)
- # downloader is None if the cache already contains the file
- if self.downloader is not None:
- d = self.downloader.when_done()
- def download_complete(junk):
- # once the download is complete, revert to non-async behaviour
- self.downloader = None
- d.addCallback(download_complete)
-
- self.file = os.fdopen(os.open(self.fname, flags, *mode), m)
- self.fd = self.file.fileno()
- self.open_for_write = False
- log('TFF: opened(%s) for read' % self.fname)
- except:
- log_exc()
- raise
-
- def log(self, msg):
- log("<TFF(%s:%s)> %s" % (os.path.basename(self.fname), self.name, msg))
-
- @logexc
- def read(self, size, offset):
- self.log('read(%r, %r)' % (size, offset, ))
- if self.downloader:
- # then we're busy doing an async download
- # (and hence implicitly, we're in an environment that supports twisted)
- #self.log('passing read() to %s' % (self.downloader, ))
- d = self.downloader.read(offset, size)
- def thunk(failure):
- raise EIO(str(failure))
- d.addErrback(thunk)
- return d
- else:
- self.log('servicing read() from %s' % (self.file, ))
- self.file.seek(offset)
- return self.file.read(size)
-
- @logexc
- def write(self, buf, offset):
- self.log("write(-%s-, %r)" % (len(buf), offset))
- if not self.open_for_write:
- return -errno.EACCES
- self.file.seek(offset)
- self.file.write(buf)
- return len(buf)
-
- @logexc
- def release(self, flags):
- self.log("release(%r)" % (flags,))
- self.file.close()
- if self.open_for_write:
- size = os.path.getsize(self.fname)
- self.fnode.size = size
- file_cap = self.tfs.upload(self.fname)
- self.fnode.ro_uri = file_cap
- # XXX [ ] TODO: set metadata
- # write new uri into parent dir entry
- self.parent.add_child(self.name, self.fnode, {})
- self.log("uploaded: %s" % (file_cap,))
-
- # dbg
- print_tree()
-
- def _fflush(self):
- if 'w' in self.file.mode or 'a' in self.file.mode:
- self.file.flush()
-
- @logexc
- def fsync(self, isfsyncfile):
- self.log("fsync(%r)" % (isfsyncfile,))
- self._fflush()
- if isfsyncfile and hasattr(os, 'fdatasync'):
- os.fdatasync(self.fd)
- else:
- os.fsync(self.fd)
-
- @logexc
- def flush(self):
- self.log("flush()")
- self._fflush()
- # cf. xmp_flush() in fusexmp_fh.c
- os.close(os.dup(self.fd))
-
- @logexc
- def fgetattr(self):
- self.log("fgetattr()")
- s = os.fstat(self.fd)
- d = stat_to_dict(s)
- if self.downloader:
- size = self.downloader.target_size
- self.log("fgetattr() during async download, cache file: %s, size=%s" % (s, size))
- d['st_size'] = size
- self.log("fgetattr() -> %r" % (d,))
- return d
-
- @logexc
- def ftruncate(self, len):
- self.log("ftruncate(%r)" % (len,))
- self.file.truncate(len)
-
-class TahoeFuseBase(object):
-
- def __init__(self, tfs):
- log("TFB: __init__()")
- self.tfs = tfs
- self.files = {}
-
- def log(self, msg):
- log("<TFB> %s" % (msg, ))
-
- @logexc
- def readlink(self, path):
- self.log("readlink(%r)" % (path,))
- node = self.tfs.get_path(path)
- if node:
- raise EINVAL('Not a symlink') # nothing in tahoe is a symlink
- else:
- raise ENOENT('Invalid argument')
-
- @logexc
- def unlink(self, path):
- self.log("unlink(%r)" % (path,))
- self.tfs.unlink(path)
-
- @logexc
- def rmdir(self, path):
- self.log("rmdir(%r)" % (path,))
- self.tfs.unlink(path)
-
- @logexc
- def symlink(self, path, path1):
- self.log("symlink(%r, %r)" % (path, path1))
- self.tfs.link(path, path1)
-
- @logexc
- def rename(self, path, path1):
- self.log("rename(%r, %r)" % (path, path1))
- self.tfs.rename(path, path1)
-
- @logexc
- def link(self, path, path1):
- self.log("link(%r, %r)" % (path, path1))
- self.tfs.link(path, path1)
-
- @logexc
- def chmod(self, path, mode):
- self.log("XX chmod(%r, %r)" % (path, mode))
- #return -errno.EOPNOTSUPP
-
- @logexc
- def chown(self, path, user, group):
- self.log("XX chown(%r, %r, %r)" % (path, user, group))
- #return -errno.EOPNOTSUPP
-
- @logexc
- def truncate(self, path, len):
- self.log("XX truncate(%r, %r)" % (path, len))
- #return -errno.EOPNOTSUPP
-
- @logexc
- def utime(self, path, times):
- self.log("XX utime(%r, %r)" % (path, times))
- #return -errno.EOPNOTSUPP
-
- @logexc
- def statfs(self):
- self.log("statfs()")
- """
- Should return an object with statvfs attributes (f_bsize, f_frsize...).
- Eg., the return value of os.statvfs() is such a thing (since py 2.2).
- If you are not reusing an existing statvfs object, start with
- fuse.StatVFS(), and define the attributes.
-
- To provide usable information (ie., you want sensible df(1)
- output, you are suggested to specify the following attributes:
-
- - f_bsize - preferred size of file blocks, in bytes
- - f_frsize - fundamental size of file blcoks, in bytes
- [if you have no idea, use the same as blocksize]
- - f_blocks - total number of blocks in the filesystem
- - f_bfree - number of free blocks
- - f_files - total number of file inodes
- - f_ffree - nunber of free file inodes
- """
-
- block_size = 4096 # 4k
- preferred_block_size = 131072 # 128k, c.f. seg_size
- fs_size = 8*2**40 # 8Tb
- fs_free = 2*2**40 # 2Tb
-
- #s = fuse.StatVfs(f_bsize = preferred_block_size,
- s = dict(f_bsize = preferred_block_size,
- f_frsize = block_size,
- f_blocks = fs_size / block_size,
- f_bfree = fs_free / block_size,
- f_bavail = fs_free / block_size,
- f_files = 2**30, # total files
- f_ffree = 2**20, # available files
- f_favail = 2**20, # available files (root)
- f_flag = 2, # no suid
- f_namemax = 255) # max name length
- #self.log('statfs(): %r' % (s,))
- return s
-
- def fsinit(self):
- self.log("fsinit()")
-
- ##################################################################
-
- @logexc
- def readdir(self, path, offset):
- self.log('readdir(%r, %r)' % (path, offset))
- node = self.tfs.get_path(path)
- if node is None:
- return -errno.ENOENT
- dirlist = ['.', '..'] + node.children.keys()
- self.log('dirlist = %r' % (dirlist,))
- #return [fuse.Direntry(d) for d in dirlist]
- return dirlist
-
- @logexc
- def getattr(self, path):
- self.log('getattr(%r)' % (path,))
-
- if path == '/':
- # we don't have any metadata for the root (no edge leading to it)
- mode = (stat.S_IFDIR | 755)
- mtime = self.tfs.root.mtime
- s = TStat({}, st_mode=mode, st_nlink=1, st_mtime=mtime)
- self.log('getattr(%r) -> %r' % (path, s))
- #return s
- return stat_to_dict(s)
-
- parent, name, child = self.tfs.get_parent_name_and_child(path)
- if not child: # implicitly 'or not parent'
- raise ENOENT('No such file or directory')
- return stat_to_dict(parent.get_stat(name))
-
- @logexc
- def access(self, path, mode):
- self.log("access(%r, %r)" % (path, mode))
- node = self.tfs.get_path(path)
- if not node:
- return -errno.ENOENT
- accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
- if (mode & 0222):
- if not node.writable():
- log('write access denied for %s (req:%o)' % (path, mode, ))
- return -errno.EACCES
- #else:
- #log('access granted for %s' % (path, ))
-
- @logexc
- def mkdir(self, path, mode):
- self.log("mkdir(%r, %r)" % (path, mode))
- self.tfs.mkdir(path)
-
- ##################################################################
- # file methods
-
- def open(self, path, flags):
- self.log('open(%r, %r)' % (path, flags, ))
- if path in self.files:
- # XXX todo [ ] should consider concurrent open files of differing modes
- return
- else:
- tffobj = TahoeFuseFile(self.tfs, path, flags)
- self.files[path] = tffobj
-
- def create(self, path, flags, mode):
- self.log('create(%r, %r, %r)' % (path, flags, mode))
- if path in self.files:
- # XXX todo [ ] should consider concurrent open files of differing modes
- return
- else:
- tffobj = TahoeFuseFile(self.tfs, path, flags, mode)
- self.files[path] = tffobj
-
- def _get_file(self, path):
- if not path in self.files:
- raise ENOENT('No such file or directory: %s' % (path,))
- return self.files[path]
-
- ##
-
- def read(self, path, size, offset):
- self.log('read(%r, %r, %r)' % (path, size, offset, ))
- return self._get_file(path).read(size, offset)
-
- @logexc
- def write(self, path, buf, offset):
- self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
- return self._get_file(path).write(buf, offset)
-
- @logexc
- def release(self, path, flags):
- self.log("release(%r, %r)" % (path, flags,))
- self._get_file(path).release(flags)
- del self.files[path]
-
- @logexc
- def fsync(self, path, isfsyncfile):
- self.log("fsync(%r, %r)" % (path, isfsyncfile,))
- return self._get_file(path).fsync(isfsyncfile)
-
- @logexc
- def flush(self, path):
- self.log("flush(%r)" % (path,))
- return self._get_file(path).flush()
-
- @logexc
- def fgetattr(self, path):
- self.log("fgetattr(%r)" % (path,))
- return self._get_file(path).fgetattr()
-
- @logexc
- def ftruncate(self, path, len):
- self.log("ftruncate(%r, %r)" % (path, len,))
- return self._get_file(path).ftruncate(len)
-
-class TahoeFuseLocal(TahoeFuseBase, fuse.Fuse):
- def __init__(self, tfs, *args, **kw):
- log("TFL: __init__(%r, %r)" % (args, kw))
- TahoeFuseBase.__init__(self, tfs)
- fuse.Fuse.__init__(self, *args, **kw)
-
- def log(self, msg):
- log("<TFL> %s" % (msg, ))
-
- def main(self, *a, **kw):
- self.log("main(%r, %r)" % (a, kw))
- return fuse.Fuse.main(self, *a, **kw)
-
- # overrides for those methods which return objects not marshalled
- def fgetattr(self, path):
- return TStat({}, **(TahoeFuseBase.fgetattr(self, path)))
-
- def getattr(self, path):
- return TStat({}, **(TahoeFuseBase.getattr(self, path)))
-
- def statfs(self):
- return fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
- #self.log('statfs()')
- #ret = fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
- #self.log('statfs(): %r' % (ret,))
- #return ret
-
- @logexc
- def readdir(self, path, offset):
- return [ fuse.Direntry(d) for d in TahoeFuseBase.readdir(self, path, offset) ]
-
-class TahoeFuseShim(fuse.Fuse):
- def __init__(self, trpc, *args, **kw):
- log("TF: __init__(%r, %r)" % (args, kw))
- self.trpc = trpc
- fuse.Fuse.__init__(self, *args, **kw)
-
- def log(self, msg):
- log("<TFs> %s" % (msg, ))
-
- @logexc
- def readlink(self, path):
- self.log("readlink(%r)" % (path,))
- return self.trpc.call('readlink', path)
-
- @logexc
- def unlink(self, path):
- self.log("unlink(%r)" % (path,))
- return self.trpc.call('unlink', path)
-
- @logexc
- def rmdir(self, path):
- self.log("rmdir(%r)" % (path,))
- return self.trpc.call('unlink', path)
-
- @logexc
- def symlink(self, path, path1):
- self.log("symlink(%r, %r)" % (path, path1))
- return self.trpc.call('link', path, path1)
-
- @logexc
- def rename(self, path, path1):
- self.log("rename(%r, %r)" % (path, path1))
- return self.trpc.call('rename', path, path1)
-
- @logexc
- def link(self, path, path1):
- self.log("link(%r, %r)" % (path, path1))
- return self.trpc.call('link', path, path1)
-
- @logexc
- def chmod(self, path, mode):
- self.log("XX chmod(%r, %r)" % (path, mode))
- return self.trpc.call('chmod', path, mode)
-
- @logexc
- def chown(self, path, user, group):
- self.log("XX chown(%r, %r, %r)" % (path, user, group))
- return self.trpc.call('chown', path, user, group)
-
- @logexc
- def truncate(self, path, len):
- self.log("XX truncate(%r, %r)" % (path, len))
- return self.trpc.call('truncate', path, len)
-
- @logexc
- def utime(self, path, times):
- self.log("XX utime(%r, %r)" % (path, times))
- return self.trpc.call('utime', path, times)
-
- @logexc
- def statfs(self):
- self.log("statfs()")
- response = self.trpc.call('statfs')
- #self.log("statfs(): %r" % (response,))
- kwargs = dict([ (str(k),v) for k,v in response.items() ])
- return fuse.StatVfs(**kwargs)
-
- def fsinit(self):
- self.log("fsinit()")
-
- def main(self, *a, **kw):
- self.log("main(%r, %r)" % (a, kw))
-
- return fuse.Fuse.main(self, *a, **kw)
-
- ##################################################################
-
- @logexc
- def readdir(self, path, offset):
- self.log('readdir(%r, %r)' % (path, offset))
- return [ fuse.Direntry(d) for d in self.trpc.call('readdir', path, offset) ]
-
- @logexc
- def getattr(self, path):
- self.log('getattr(%r)' % (path,))
- response = self.trpc.call('getattr', path)
- kwargs = dict([ (str(k),v) for k,v in response.items() ])
- s = TStat({}, **kwargs)
- self.log('getattr(%r) -> %r' % (path, s))
- return s
-
- @logexc
- def access(self, path, mode):
- self.log("access(%r, %r)" % (path, mode))
- return self.trpc.call('access', path, mode)
-
- @logexc
- def mkdir(self, path, mode):
- self.log("mkdir(%r, %r)" % (path, mode))
- return self.trpc.call('mkdir', path, mode)
-
- ##################################################################
- # file methods
-
- def open(self, path, flags):
- self.log('open(%r, %r)' % (path, flags, ))
- return self.trpc.call('open', path, flags)
-
- def create(self, path, flags, mode):
- self.log('create(%r, %r, %r)' % (path, flags, mode))
- return self.trpc.call('create', path, flags, mode)
-
- ##
-
- def read(self, path, size, offset):
- self.log('read(%r, %r, %r)' % (path, size, offset, ))
- return self.trpc.call('read', path, size, offset)
-
- @logexc
- def write(self, path, buf, offset):
- self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
- return self.trpc.call('write', path, buf, offset)
-
- @logexc
- def release(self, path, flags):
- self.log("release(%r, %r)" % (path, flags,))
- return self.trpc.call('release', path, flags)
-
- @logexc
- def fsync(self, path, isfsyncfile):
- self.log("fsync(%r, %r)" % (path, isfsyncfile,))
- return self.trpc.call('fsync', path, isfsyncfile)
-
- @logexc
- def flush(self, path):
- self.log("flush(%r)" % (path,))
- return self.trpc.call('flush', path)
-
- @logexc
- def fgetattr(self, path):
- self.log("fgetattr(%r)" % (path,))
- #return self.trpc.call('fgetattr', path)
- response = self.trpc.call('fgetattr', path)
- kwargs = dict([ (str(k),v) for k,v in response.items() ])
- s = TStat({}, **kwargs)
- self.log('getattr(%r) -> %r' % (path, s))
- return s
-
- @logexc
- def ftruncate(self, path, len):
- self.log("ftruncate(%r, %r)" % (path, len,))
- return self.trpc.call('ftruncate', path, len)
-
-
-def launch_tahoe_fuse(tf_class, tobj, argv):
- sys.argv = ['tahoe fuse'] + list(argv)
- log('setting sys.argv=%r' % (sys.argv,))
- config = TahoeFuseOptions()
- version = "%prog " +VERSIONSTR+", fuse "+ fuse.__version__
- server = tf_class(tobj, version=version, usage=config.getSynopsis(), dash_s_do='setsingle')
- server.parse(errex=1)
- server.main()
-
-def getnodeurl(nodedir):
- f = file(os.path.expanduser(os.path.join(nodedir, "node.url")), 'rb')
- nu = f.read().strip()
- f.close()
- if nu[-1] != "/":
- nu += "/"
- return nu
-
-def fingerprint(uri):
- if uri is None:
- return None
- return base64.b32encode(sha.new(uri).digest()).lower()[:6]
-
-stat_fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
- 'st_atime', 'st_mtime', 'st_ctime', ]
-def stat_to_dict(statobj, fields=None):
- if fields is None:
- fields = stat_fields
- d = {}
- for f in fields:
- d[f] = getattr(statobj, f, None)
- return d
-
-class TStat(fuse.Stat):
- # in fuse 0.2, these are set by fuse.Stat.__init__
- # in fuse 0.2-pre3 (hardy) they are not. badness ensues if they're missing
- st_mode = None
- st_ino = 0
- st_dev = 0
- st_nlink = None
- st_uid = 0
- st_gid = 0
- st_size = 0
- st_atime = 0
- st_mtime = 0
- st_ctime = 0
-
- fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
- 'st_atime', 'st_mtime', 'st_ctime', ]
- def __init__(self, metadata, **kwargs):
- # first load any stat fields present in 'metadata'
- for st in [ 'mtime', 'ctime' ]:
- if st in metadata:
- setattr(self, "st_%s" % st, metadata[st])
- for st in self.fields:
- if st in metadata:
- setattr(self, st, metadata[st])
-
- # then set any values passed in as kwargs
- fuse.Stat.__init__(self, **kwargs)
-
- def __repr__(self):
- return "<Stat%r>" % (stat_to_dict(self),)
-
-class Directory(object):
- def __init__(self, tfs, ro_uri, rw_uri):
- self.tfs = tfs
- self.ro_uri = ro_uri
- self.rw_uri = rw_uri
- assert (rw_uri or ro_uri)
- self.children = {}
- self.last_load = None
- self.last_data = None
- self.mtime = 0
-
- def __repr__(self):
- return "<Directory %s>" % (fingerprint(self.get_uri()),)
-
- def maybe_refresh(self, name=None):
- """
- if the previously cached data was retrieved within the cache
- validity period, does nothing. otherwise refetches the data
- for this directory and reloads itself
- """
- now = time.time()
- if self.last_load is None or (now - self.last_load) > self.tfs.cache_validity:
- self.load(name)
-
- def load(self, name=None):
- now = time.time()
- log('%s.loading(%s)' % (self, name))
- url = self.tfs.compose_url("uri/%s?t=json", self.get_uri())
- data = urllib.urlopen(url).read()
- h = tagged_hash('cache_hash', data)
- if h == self.last_data:
- self.last_load = now
- log('%s.load() : no change h(data)=%s' % (self, base32.b2a(h), ))
- return
- try:
- parsed = simplejson.loads(data)
- except ValueError:
- log('%s.load(): unable to parse json data for dir:\n%r' % (self, data))
- return
- nodetype, d = parsed
- assert nodetype == 'dirnode'
- self.children.clear()
- for cname,details in d['children'].items():
- cname = unicode_to_utf8_or_str(cname)
- ctype, cattrs = details
- metadata = cattrs.get('metadata', {})
- if ctype == 'dirnode':
- cobj = self.tfs.dir_for(cname, cattrs.get('ro_uri'), cattrs.get('rw_uri'))
- else:
- assert ctype == "filenode"
- cobj = File(cattrs.get('size'), cattrs.get('ro_uri'))
- self.children[cname] = cobj, metadata
- self.last_load = now
- self.last_data = h
- self.mtime = now
- log('%s.load() loaded: \n%s' % (self, self.pprint(),))
-
- def get_children(self):
- return self.children.keys()
-
- def get_child(self, name):
- return self.children[name][0]
-
- def add_child(self, name, child, metadata):
- log('%s.add_child(%r, %r, %r)' % (self, name, child, metadata, ))
- self.children[name] = child, metadata
- url = self.tfs.compose_url("uri/%s/%s?t=uri", self.get_uri(), name)
- child_cap = do_http('PUT', url, child.get_uri())
- # XXX [ ] TODO: push metadata to tahoe node
- assert child_cap == child.get_uri()
- self.mtime = time.time()
- log('added child %r with %r to %r' % (name, child_cap, self))
-
- def remove_child(self, name):
- log('%s.remove_child(%r)' % (self, name, ))
- del self.children[name]
- url = self.tfs.compose_url("uri/%s/%s", self.get_uri(), name)
- resp = do_http('DELETE', url)
- self.mtime = time.time()
- log('child (%s) removal yielded %r' % (name, resp,))
-
- def get_uri(self):
- return self.rw_uri or self.ro_uri
-
- # TODO: rename to 'is_writeable', or switch sense to 'is_readonly', for consistency with Tahoe code
- def writable(self):
- return self.rw_uri and self.rw_uri != self.ro_uri
-
- def pprint(self, prefix='', printed=None, suffix=''):
- ret = []
- if printed is None:
- printed = set()
- writable = self.writable() and '+' or ' '
- if self in printed:
- ret.append(" %s/%s ... <%s> : %s" % (prefix, writable, fingerprint(self.get_uri()), suffix, ))
- else:
- ret.append("[%s] %s/%s : %s" % (fingerprint(self.get_uri()), prefix, writable, suffix, ))
- printed.add(self)
- for name,(child,metadata) in sorted(self.children.items()):
- ret.append(child.pprint(' ' * (len(prefix)+1)+name, printed, repr(metadata)))
- return '\n'.join(ret)
-
- def get_metadata(self, name):
- return self.children[name][1]
-
- def get_stat(self, name):
- child,metadata = self.children[name]
- log("%s.get_stat(%s) md: %r" % (self, name, metadata))
-
- if isinstance(child, Directory):
- child.maybe_refresh(name)
- mode = metadata.get('st_mode') or (stat.S_IFDIR | 0755)
- s = TStat(metadata, st_mode=mode, st_nlink=1, st_mtime=child.mtime)
- else:
- if hasattr(child, 'tmp_fname'):
- s = os.stat(child.tmp_fname)
- log("%s.get_stat(%s) returning local stat of tmp file" % (self, name, ))
- else:
- s = TStat(metadata,
- st_nlink = 1,
- st_size = child.size,
- st_mode = metadata.get('st_mode') or (stat.S_IFREG | 0444),
- st_mtime = metadata.get('mtime') or self.mtime,
- )
- return s
-
- log("%s.get_stat(%s)->%s" % (self, name, s))
- return s
-
-class File(object):
- def __init__(self, size, ro_uri):
- self.size = size
- if ro_uri:
- ro_uri = str(ro_uri)
- self.ro_uri = ro_uri
-
- def __repr__(self):
- return "<File %s>" % (fingerprint(self.ro_uri) or [self.tmp_fname],)
-
- def pprint(self, prefix='', printed=None, suffix=''):
- return " %s (%s) : %s" % (prefix, self.size, suffix, )
-
- def get_uri(self):
- return self.ro_uri
-
- def writable(self):
- return True
-
-class TFS(object):
- def __init__(self, nodedir, nodeurl, root_uri,
- cache_validity_period=DEFAULT_DIRECTORY_VALIDITY, async=False):
- self.cache_validity = cache_validity_period
- self.nodeurl = nodeurl
- self.root_uri = root_uri
- self.async = async
- self.dirs = {}
-
- cachedir = os.path.expanduser(os.path.join(nodedir, '_cache'))
- self.cache = FileCache(nodeurl, cachedir)
- ro_uri = DirectoryURI.init_from_string(self.root_uri).get_readonly()
- self.root = Directory(self, ro_uri, self.root_uri)
- self.root.maybe_refresh('<root>')
-
- def log(self, msg):
- log("<TFS> %s" % (msg, ))
-
- def pprint(self):
- return self.root.pprint()
-
- def compose_url(self, fmt, *args):
- return self.nodeurl + (fmt % tuple(map(urllib.quote, args)))
-
- def get_parent_name_and_child(self, path):
- """
- find the parent dir node, name of child relative to that parent, and
- child node within the TFS object space.
- @returns: (parent, name, child) if the child is found
- (parent, name, None) if the child is missing from the parent
- (None, name, None) if the parent is not found
- """
- if path == '/':
- return
- dirname, name = os.path.split(path)
- parent = self.get_path(dirname)
- if parent:
- try:
- child = parent.get_child(name)
- return parent, name, child
- except KeyError:
- return parent, name, None
- else:
- return None, name, None
-
- def get_path(self, path):
- comps = path.strip('/').split('/')
- if comps == ['']:
- comps = []
- cursor = self.root
- c_name = '<root>'
- for comp in comps:
- if not isinstance(cursor, Directory):
- self.log('path "%s" is not a dir' % (path,))
- return None
- cursor.maybe_refresh(c_name)
- try:
- cursor = cursor.get_child(comp)
- c_name = comp
- except KeyError:
- self.log('path "%s" not found' % (path,))
- return None
- if isinstance(cursor, Directory):
- cursor.maybe_refresh(c_name)
- return cursor
-
- def dir_for(self, name, ro_uri, rw_uri):
- #self.log('dir_for(%s) [%s/%s]' % (name, fingerprint(ro_uri), fingerprint(rw_uri)))
- if ro_uri:
- ro_uri = str(ro_uri)
- if rw_uri:
- rw_uri = str(rw_uri)
- uri = rw_uri or ro_uri
- assert uri
- dirobj = self.dirs.get(uri)
- if not dirobj:
- self.log('dir_for(%s) creating new Directory' % (name, ))
- dirobj = Directory(self, ro_uri, rw_uri)
- self.dirs[uri] = dirobj
- return dirobj
-
- def upload(self, fname):
- self.log('upload(%r)' % (fname,))
- fh = file(fname, 'rb')
- url = self.compose_url("uri")
- file_cap = do_http('PUT', url, fh)
- self.log('uploaded to: %r' % (file_cap,))
- return file_cap
-
- def mkdir(self, path):
- self.log('mkdir(%r)' % (path,))
- parent, name, child = self.get_parent_name_and_child(path)
-
- if child:
- raise EEXIST('File exists: %s' % (name,))
- if not parent:
- raise ENOENT('No such file or directory: %s' % (path,))
-
- url = self.compose_url("uri?t=mkdir")
- new_dir_cap = do_http('PUT', url)
-
- ro_uri = DirectoryURI.init_from_string(new_dir_cap).get_readonly()
- child = Directory(self, ro_uri, new_dir_cap)
- parent.add_child(name, child, {})
-
- def rename(self, path, path1):
- self.log('rename(%s, %s)' % (path, path1))
- src_parent, src_name, src_child = self.get_parent_name_and_child(path)
- dst_parent, dst_name, dst_child = self.get_parent_name_and_child(path1)
-
- if not src_child or not dst_parent:
- raise ENOENT('No such file or directory')
-
- dst_parent.add_child(dst_name, src_child, {})
- src_parent.remove_child(src_name)
-
- def unlink(self, path):
- parent, name, child = self.get_parent_name_and_child(path)
-
- if child is None: # parent or child is missing
- raise ENOENT('No such file or directory')
- if not parent.writable():
- raise EACCESS('Permission denied')
-
- parent.remove_child(name)
-
- def link(self, path, path1):
- src = self.get_path(path)
- dst_parent, dst_name, dst_child = self.get_parent_name_and_child(path1)
-
- if not src:
- raise ENOENT('No such file or directory')
- if dst_parent is None:
- raise ENOENT('No such file or directory')
- if not dst_parent.writable():
- raise EACCESS('Permission denied')
-
- dst_parent.add_child(dst_name, src, {})
-
-class FileCache(object):
- def __init__(self, nodeurl, cachedir):
- self.nodeurl = nodeurl
- self.cachedir = cachedir
- if not os.path.exists(self.cachedir):
- os.makedirs(self.cachedir)
- self.tmpdir = os.path.join(self.cachedir, 'tmp')
- if not os.path.exists(self.tmpdir):
- os.makedirs(self.tmpdir)
- self.downloaders = weakref.WeakValueDictionary()
-
- def log(self, msg):
- log("<FC> %s" % (msg, ))
-
- def get_file(self, uri):
- self.log('get_file(%s)' % (uri,))
- if is_literal_file_uri(uri):
- return self.get_literal(uri)
- else:
- return self.get_chk(uri, async=False)
-
- def async_get_file(self, uri):
- self.log('get_file(%s)' % (uri,))
- return self.get_chk(uri, async=True)
-
- def get_literal(self, uri):
- h = sha.new(uri).digest()
- u = LiteralFileURI.init_from_string(uri)
- fname = os.path.join(self.cachedir, '__'+base64.b32encode(h).lower())
- size = len(u.data)
- self.log('writing literal file %s (%s)' % (fname, size, ))
- fh = open(fname, 'wb')
- fh.write(u.data)
- fh.close()
- return fname
-
- def get_chk(self, uri, async=False):
- u = CHKFileURI.init_from_string(str(uri))
- storage_index = u.storage_index
- size = u.size
- fname = os.path.join(self.cachedir, base64.b32encode(storage_index).lower())
- if os.path.exists(fname):
- fsize = os.path.getsize(fname)
- if fsize == size:
- if async:
- return fname, None
- else:
- return fname
- else:
- self.log('warning file "%s" is too short %s < %s' % (fname, fsize, size))
- self.log('downloading file %s (%s)' % (fname, size, ))
- url = "%suri/%s" % (self.nodeurl, uri)
- if async:
- if fname in self.downloaders and self.downloaders[fname].running:
- downloader = self.downloaders[fname]
- else:
- downloader = DownloaderWithReadQueue()
- self.downloaders[fname] = downloader
- d = downloader.start(url, fname, target_size=u.size)
- def clear_downloader(result, fname):
- self.log('clearing %s from downloaders: %r' % (fname, result))
- self.downloaders.pop(fname, None)
- d.addBoth(clear_downloader, fname)
- return fname, downloader
- else:
- fh = open(fname, 'wb')
- download = urllib.urlopen(url)
- while True:
- chunk = download.read(4096)
- if not chunk:
- break
- fh.write(chunk)
- fh.close()
- return fname
-
- def tmp_file(self, id):
- fname = os.path.join(self.tmpdir, base64.b32encode(id).lower())
- return fname
-
-_tfs = None # to appease pyflakes; is set in main()
-def print_tree():
- log('tree:\n' + _tfs.pprint())
-
-
-def unmarshal(obj):
- if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
- return obj
- elif isinstance(obj, unicode) or isinstance(obj, str):
- #log('unmarshal(%r)' % (obj,))
- return base64.b64decode(obj)
- elif isinstance(obj, list):
- return map(unmarshal, obj)
- elif isinstance(obj, dict):
- return dict([ (k,unmarshal(v)) for k,v in obj.items() ])
- else:
- raise ValueError('object type not int,str,list,dict,none (%s) (%r)' % (type(obj), obj))
-
-def marshal(obj):
- if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
- return obj
- elif isinstance(obj, str):
- return base64.b64encode(obj)
- elif isinstance(obj, list) or isinstance(obj, tuple):
- return map(marshal, obj)
- elif isinstance(obj, dict):
- return dict([ (k,marshal(v)) for k,v in obj.items() ])
- else:
- raise ValueError('object type not int,str,list,dict,none (%s)' % type(obj))
-
-
-class TRPCProtocol(Protocol):
- compute_response_sha1 = True
- log_all_requests = False
-
- def connectionMade(self):
- self.buf = []
-
- def dataReceived(self, data):
- if data == 'keepalive\n':
- log('keepalive connection on %r' % (self.transport,))
- self.keepalive = True
- return
-
- if not data.endswith('\n'):
- self.buf.append(data)
- return
- if self.buf:
- self.buf.append(data)
- reqstr = ''.join(self.buf)
- self.buf = []
- self.dispatch_request(reqstr)
- else:
- self.dispatch_request(data)
-
- def dispatch_request(self, reqstr):
- try:
- req = simplejson.loads(reqstr)
- except ValueError, ve:
- log(ve)
- return
-
- d = defer.maybeDeferred(self.handle_request, req)
- d.addCallback(self.send_response)
- d.addErrback(self.send_error)
-
- def send_error(self, failure):
- log('failure: %s' % (failure,))
- if failure.check(TFSIOError):
- e = failure.value
- self.send_response(['error', 'errno', e.args[0], e.args[1]])
- else:
- self.send_response(['error', 'failure', str(failure)])
-
- def send_response(self, result):
- response = simplejson.dumps(result)
- header = { 'len': len(response), }
- if self.compute_response_sha1:
- header['sha1'] = base64.b64encode(sha.new(response).digest())
- hdr = simplejson.dumps(header)
- self.transport.write(hdr)
- self.transport.write('\n')
- self.transport.write(response)
- self.transport.loseConnection()
-
- def connectionLost(self, reason):
- if hasattr(self, 'keepalive'):
- log('keepalive connection %r lost, shutting down' % (self.transport,))
- reactor.callLater(0, reactor.stop)
-
- def handle_request(self, req):
- if type(req) is not list or not req or len(req) < 1:
- return ['error', 'malformed request']
- if req[0] == 'call':
- if len(req) < 3:
- return ['error', 'malformed request']
- methname = req[1]
- try:
- args = unmarshal(req[2])
- except ValueError, ve:
- return ['error', 'malformed arguments', str(ve)]
-
- try:
- meth = getattr(self.factory.server, methname)
- except AttributeError, ae:
- return ['error', 'no such method', str(ae)]
-
- if self.log_all_requests:
- log('call %s(%s)' % (methname, ', '.join(map(repr, args))))
- try:
- result = meth(*args)
- except TFSIOError, e:
- log('errno: %s; %s' % e.args)
- return ['error', 'errno', e.args[0], e.args[1]]
- except Exception, e:
- log('exception: ' + traceback.format_exc())
- return ['error', 'exception', str(e)]
- d = defer.succeed(None)
- d.addCallback(lambda junk: result) # result may be Deferred
- d.addCallback(lambda res: ['result', marshal(res)]) # only applies if not errback
- return d
-
-class TFSServer(object):
- def __init__(self, socket_path, server=None):
- self.socket_path = socket_path
- log('TFSServer init socket: %s' % (socket_path,))
-
- self.factory = Factory()
- self.factory.protocol = TRPCProtocol
- if server:
- self.factory.server = server
- else:
- self.factory.server = self
-
- def get_service(self):
- if not hasattr(self, 'svc'):
- from twisted.application import strports
- self.svc = strports.service('unix:'+self.socket_path, self.factory)
- return self.svc
-
- def run(self):
- svc = self.get_service()
- def ss():
- try:
- svc.startService()
- except:
- reactor.callLater(0, reactor.stop)
- raise
- reactor.callLater(0, ss)
- reactor.run()
-
- def hello(self):
- return 'pleased to meet you'
-
- def echo(self, arg):
- return arg
-
- def failex(self):
- raise ValueError('expected')
-
- def fail(self):
- return defer.maybeDeferred(self.failex)
-
-class RPCError(RuntimeError):
- pass
-
-class TRPC(object):
- def __init__(self, socket_fname):
- self.socket_fname = socket_fname
- self.keepalive = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.keepalive.connect(self.socket_fname)
- self.keepalive.send('keepalive\n')
- log('requested keepalive on %s' % (self.keepalive,))
-
- def req(self, req):
- # open conenction to trpc server
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s.connect(self.socket_fname)
- # send request
- s.send(simplejson.dumps(req))
- s.send('\n')
- # read response header
- hdr_data = s.recv(8192)
- first_newline = hdr_data.index('\n')
- header = hdr_data[:first_newline]
- data = hdr_data[first_newline+1:]
- hdr = simplejson.loads(header)
- hdr_len = hdr['len']
- if hdr.has_key('sha1'):
- hdr_sha1 = base64.b64decode(hdr['sha1'])
- spool = [data]
- spool_sha = sha.new(data)
- # spool response
- while True:
- data = s.recv(8192)
- if data:
- spool.append(data)
- spool_sha.update(data)
- else:
- break
- else:
- spool = [data]
- # spool response
- while True:
- data = s.recv(8192)
- if data:
- spool.append(data)
- else:
- break
- s.close()
- # decode response
- resp = ''.join(spool)
- spool = None
- assert hdr_len == len(resp), str((hdr_len, len(resp), repr(resp)))
- if hdr.has_key('sha1'):
- data_sha1 = spool_sha.digest()
- spool = spool_sha = None
- assert hdr_sha1 == data_sha1, str((base32.b2a(hdr_sha1), base32.b2a(data_sha1)))
- #else:
- #print 'warning, server provided no sha1 to check'
- return resp
-
- def call(self, methodname, *args):
- res = self.req(['call', methodname, marshal(args)])
-
- result = simplejson.loads(res)
- if not result or len(result) < 2:
- raise TypeError('malformed response %r' % (result,))
- if result[0] == 'error':
- if result[1] == 'errno':
- raise TFSIOError(result[2], result[3])
- else:
- raise RPCError(*(result[1:])) # error, exception / error, failure
- elif result[0] == 'result':
- return unmarshal(result[1])
- else:
- raise TypeError('unknown response type %r' % (result[0],))
-
- def shutdown(self):
- log('shutdown() closing keepalive %s' % (self.keepalive,))
- self.keepalive.close()
-
-# (cut-n-pasted here due to an ImportError / some py2app linkage issues)
-#from twisted.scripts._twistd_unix import daemonize
-def daemonize():
- # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
- if os.fork(): # launch child and...
- os._exit(0) # kill off parent
- os.setsid()
- if os.fork(): # launch child and...
- os._exit(0) # kill off parent again.
- os.umask(077)
- null=os.open('/dev/null', os.O_RDWR)
- for i in range(3):
- try:
- os.dup2(null, i)
- except OSError, e:
- if e.errno != errno.EBADF:
- raise
- os.close(null)
-
-def main(argv):
- log("main(%s)" % (argv,))
-
- # check for version or help options (no args == help)
- if not argv:
- argv = ['--help']
- if len(argv) == 1 and argv[0] in ['-h', '--help']:
- config = TahoeFuseOptions()
- print >> sys.stderr, config
- print >> sys.stderr, 'fuse usage follows:'
- if len(argv) == 1 and argv[0] in ['-h', '--help', '--version']:
- launch_tahoe_fuse(TahoeFuseLocal, None, argv)
- return -2
-
- # parse command line options
- config = TahoeFuseOptions()
- try:
- #print 'parsing', argv
- config.parseOptions(argv)
- except usage.error, e:
- print config
- print e
- return -1
-
- # check for which alias or uri is specified
- if config['alias']:
- alias = config['alias']
- #print 'looking for aliases in', config['node-directory']
- aliases = get_aliases(os.path.expanduser(config['node-directory']))
- if alias not in aliases:
- raise usage.error('Alias %r not found' % (alias,))
- root_uri = aliases[alias]
- root_name = alias
- elif config['root-uri']:
- root_uri = config['root-uri']
- root_name = 'uri_' + base32.b2a(tagged_hash('root_name', root_uri))[:12]
- # test the uri for structural validity:
- try:
- DirectoryURI.init_from_string(root_uri)
- except:
- raise usage.error('root-uri must be a valid directory uri (not %r)' % (root_uri,))
- else:
- raise usage.error('At least one of --alias or --root-uri must be specified')
-
- nodedir = config['node-directory']
- nodeurl = config['node-url']
- if not nodeurl:
- nodeurl = getnodeurl(nodedir)
-
- # allocate socket
- socket_dir = os.path.join(os.path.expanduser(nodedir), "tfuse.sockets")
- socket_path = os.path.join(socket_dir, root_name)
- if len(socket_path) > 103:
- # try googling AF_UNIX and sun_len for some taste of why this oddity exists.
- raise OSError(errno.ENAMETOOLONG, 'socket path too long (%s)' % (socket_path,))
-
- fileutil.make_dirs(socket_dir, 0700)
- if os.path.exists(socket_path):
- log('socket exists')
- if config['server-shutdown']:
- log('calling shutdown')
- trpc = TRPC(socket_path)
- result = trpc.shutdown()
- log('result: %r' % (result,))
- log('called shutdown')
- return
- else:
- raise OSError(errno.EEXIST, 'fuse already running (%r exists)' % (socket_path,))
- elif config['server-shutdown']:
- raise OSError(errno.ENOTCONN, '--server-shutdown specified, but server not running')
-
- if not os.path.exists(config.mountpoint):
- raise OSError(errno.ENOENT, 'No such file or directory: "%s"' % (config.mountpoint,))
-
- global _tfs
- #
- # Standalone ("no-split")
- #
- if config['no-split']:
- reopen_logfile('tfuse.%s.unsplit.log' % (root_name,))
- log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n')
-
- cache_timeout = float(config['cache-timeout'])
- tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=False)
- #print tfs.pprint()
-
- # make tfs instance accesible to print_tree() for dbg
- _tfs = tfs
-
- args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
- launch_tahoe_fuse(TahoeFuseLocal, tfs, args)
-
- #
- # Server
- #
- elif config['server']:
- reopen_logfile('tfuse.%s.server.log' % (root_name,))
- log('\n'+(24*'_')+'init (server)'+(24*'_')+'\n')
-
- log('daemonizing')
- daemonize()
-
- try:
- cache_timeout = float(config['cache-timeout'])
- tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=True)
- #print tfs.pprint()
-
- # make tfs instance accesible to print_tree() for dbg
- _tfs = tfs
-
- log('launching tfs server')
- tfuse = TahoeFuseBase(tfs)
- tfs_server = TFSServer(socket_path, tfuse)
- tfs_server.run()
- log('tfs server ran, exiting')
- except:
- log('exception: ' + traceback.format_exc())
-
- #
- # Client
- #
- else:
- reopen_logfile('tfuse.%s.client.log' % (root_name,))
- log('\n'+(24*'_')+'init (client)'+(24*'_')+'\n')
-
- server_args = [sys.executable, sys.argv[0], '--server'] + argv
- if 'Allmydata.app/Contents/MacOS' in sys.executable:
- # in this case blackmatch is the 'fuse' subcommand of the 'tahoe' executable
- # otherwise we assume blackmatch is being run from source
- server_args.insert(2, 'fuse')
- #print 'launching server:', server_args
- server = subprocess.Popen(server_args)
- waiting_since = time.time()
- wait_at_most = 8
- while not os.path.exists(socket_path):
- log('waiting for appearance of %r' % (socket_path,))
- time.sleep(1)
- if time.time() - waiting_since > wait_at_most:
- log('%r did not appear within %ss' % (socket_path, wait_at_most))
- raise IOError(2, 'no socket %s' % (socket_path,))
- #print 'launched server'
- trpc = TRPC(socket_path)
-
-
- args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
- launch_tahoe_fuse(TahoeFuseShim, trpc, args)
-
-
-if __name__ == '__main__':
- sys.exit(main(sys.argv[1:]))