]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
fuse/blackmatch: split into client/server (twisted server)
authorrobk-tahoe <robk-tahoe@allmydata.com>
Thu, 16 Oct 2008 15:08:46 +0000 (08:08 -0700)
committerrobk-tahoe <robk-tahoe@allmydata.com>
Thu, 16 Oct 2008 15:08:46 +0000 (08:08 -0700)
This implements a client/server split for blackmatch, where the client
implements the fuse_main bindings and a simple blocking rpc client mechanism.
The server implements the other half of that rpc mechanism, and contains all
the actual logic for interpreting fuse requests in the context of the on disk
cache and requests to the tahoe node.  The server is based on a twisted reactor.

The rpc mechanism implements a simple method dispatch including marshalling,
using json, of basic inert data types, in a flat namespace (no objects).
The client side is written in a blocking idiom, to interface with the threading
model used by the fuse_main bindings, whereas the server side is written for a
twisted reactor-based environment, intended to facilitate implementing more
sophisticated logic in that paradigm.  The two communicate over a unix domain
socket, allocated within the nodedir.

Command line usage is unchanged; the server is launched automatically by the
client. The server daemonizes itself, to avoid preventing the original parent
process (e.g. 'runtests') from waiting upon the server exiting.

The client keeps open a 'keepalive' connection to the server; upon loss thereof
the server will exit. This addresses the fact that the python-fuse bindings
provide no notification of exit of the client process upon unmount.

The client thus provides a relatively thin 'shim' proxying requests from the
fuse_main bindings across the rpc to the server process, which handles the
logic behind each request.

For the time being, a '--no-split' option is provided to surpress the splitting
into client/server, yielding the prior behaviour.  Once the server logic gets
more complex and more entrenched in a twisted idiom, this might be removed.
The 'runtests' test harness currently tests both modes, as 'impl_c' and
'impl_c_no_split'

contrib/fuse/impl_c/blackmatch.py
contrib/fuse/runtests.py

index 3224069698069531198a5d1ade611fe26ed79cd6..3f6558243f4a67139476a0d3d302d9b66e4d0ea3 100644 (file)
@@ -5,25 +5,25 @@ from allmydata.uri import CHKFileURI, NewDirectoryURI, LiteralFileURI
 from allmydata.scripts.common_http import do_http as do_http_req
 from allmydata.util.hashutil import tagged_hash
 from allmydata.util.assertutil import precondition
-from allmydata.util import base32
+from allmydata.util import base32, fileutil
 from allmydata.scripts.common import get_aliases
 
 from twisted.python import usage
+from twisted.internet.protocol import Factory, Protocol
+from twisted.internet import reactor, defer
 
 import base64
+import errno
 import sha
+import socket
+import stat
+import subprocess
 import sys
 import os
 #import pprint
-import errno
-import stat
-# pull in some spaghetti to make this stuff work without fuse-py being installed
-try:
-    import _find_fuse_parts
-    junk = _find_fuse_parts
-    del junk
-except ImportError:
-    pass
+
+# one needs either python-fuse to have been installed in sys.path, or
+# suitable affordances to be made in the build or runtime environment
 import fuse
 
 import time
@@ -31,7 +31,7 @@ import traceback
 import simplejson
 import urllib
 
-VERSIONSTR="0.6"
+VERSIONSTR="0.7"
 
 USAGE = 'usage: tahoe fuse [dir_cap_name] [fuse_options] mountpoint'
 DEFAULT_DIRECTORY_VALIDITY=26
@@ -62,6 +62,14 @@ class TahoeFuseOptions(usage.Options):
         ["cache-timeout", None, 20,
          "Time, in seconds, to cache directory data."],
         ]
+    optFlags = [
+        ['no-split', None,
+         'run stand-alone; no splitting into client and server'],
+        ['server', None,
+         'server mode (should not be used by end users)'],
+        ['server-shutdown', None,
+         'shutdown server (should not be used by end users)'],
+         ]
 
     def __init__(self):
         usage.Options.__init__(self)
@@ -84,6 +92,12 @@ class TahoeFuseOptions(usage.Options):
 
 logfile = file('tfuse.log', 'ab')
 
+def reopen_logfile(fname):
+    global logfile
+    log('switching to %s' % (fname,))
+    logfile.close()
+    logfile = file(fname, 'ab')
+
 def log(msg):
     logfile.write("%s: %s\n" % (time.asctime(), msg))
     #time.sleep(0.1)
@@ -273,31 +287,22 @@ class TahoeFuseFile(object):
         self.log("fgetattr()")
         s = os.fstat(self.fd)
         self.log("fgetattr() -> %r" % (s,))
-        return s
+        return stat_to_dict(s)
 
     @logexc
     def ftruncate(self, len):
         self.log("ftruncate(%r)" % (len,))
         self.file.truncate(len)
 
-class TahoeFuse(fuse.Fuse):
-
-    def __init__(self, tfs, *args, **kw):
-        log("TF: __init__(%r, %r)" % (args, kw))
+class TahoeFuseBase(object):
 
+    def __init__(self, tfs):
+        log("TFB: __init__()")
         self.tfs = tfs
-        #_tfs_ = tfs
-        #class MyFuseFile(TahoeFuseFile):
-            #tfs = _tfs_
-        #self.file_class = MyFuseFile
-        #log("TF: file_class: %r" % (self.file_class,))
-
         self.files = {}
 
-        fuse.Fuse.__init__(self, *args, **kw)
-
     def log(self, msg):
-        log("<TF> %s" % (msg, ))
+        log("<TFB> %s" % (msg, ))
 
     @logexc
     def readlink(self, path):
@@ -379,7 +384,8 @@ class TahoeFuse(fuse.Fuse):
         fs_size = 8*2**40 # 8Tb
         fs_free = 2*2**40 # 2Tb
 
-        s = fuse.StatVfs(f_bsize = preferred_block_size,
+        #s = fuse.StatVfs(f_bsize = preferred_block_size,
+        s = dict(f_bsize = preferred_block_size,
                          f_frsize = block_size,
                          f_blocks = fs_size / block_size,
                          f_bfree = fs_free / block_size,
@@ -389,16 +395,12 @@ class TahoeFuse(fuse.Fuse):
                          f_favail = 2**20, # available files (root)
                          f_flag = 2, # no suid
                          f_namemax = 255) # max name length
+        #self.log('statfs(): %r' % (s,))
         return s
 
     def fsinit(self):
         self.log("fsinit()")
 
-    def main(self, *a, **kw):
-        self.log("main(%r, %r)" % (a, kw))
-
-        return fuse.Fuse.main(self, *a, **kw)
-
     ##################################################################
 
     @logexc
@@ -409,7 +411,8 @@ class TahoeFuse(fuse.Fuse):
             return -errno.ENOENT
         dirlist = ['.', '..'] + node.children.keys()
         self.log('dirlist = %r' % (dirlist,))
-        return [fuse.Direntry(d) for d in dirlist]
+        #return [fuse.Direntry(d) for d in dirlist]
+        return dirlist
 
     @logexc
     def getattr(self, path):
@@ -421,12 +424,13 @@ class TahoeFuse(fuse.Fuse):
             mtime = self.tfs.root.mtime
             s = TStat({}, st_mode=mode, st_nlink=1, st_mtime=mtime)
             self.log('getattr(%r) -> %r' % (path, s))
-            return s
+            #return s
+            return stat_to_dict(s)
             
         parent, name, child = self.tfs.get_parent_name_and_child(path)
         if not child: # implicitly 'or not parent'
             raise ENOENT('No such file or directory')
-        return parent.get_stat(name)
+        return stat_to_dict(parent.get_stat(name))
 
     @logexc
     def access(self, path, mode):
@@ -510,14 +514,197 @@ class TahoeFuse(fuse.Fuse):
         self.log("ftruncate(%r, %r)" % (path, len,))
         return self._get_file(path).ftruncate(len)
 
+class TahoeFuseLocal(TahoeFuseBase, fuse.Fuse):
+    def __init__(self, tfs, *args, **kw):
+        log("TFL: __init__(%r, %r)" % (args, kw))
+        TahoeFuseBase.__init__(self, tfs)
+        fuse.Fuse.__init__(self, *args, **kw)
+
+    def log(self, msg):
+        log("<TFL> %s" % (msg, ))
+
+    def main(self, *a, **kw):
+        self.log("main(%r, %r)" % (a, kw))
+        return fuse.Fuse.main(self, *a, **kw)
+
+    # overrides for those methods which return objects not marshalled
+    def fgetattr(self, path):
+        return TStat({}, **(TahoeFuseBase.fgetattr(self, path)))
+
+    def getattr(self, path):
+        return TStat({}, **(TahoeFuseBase.getattr(self, path)))
+
+    def statfs(self):
+        return fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
+        #self.log('statfs()')
+        #ret = fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
+        #self.log('statfs(): %r' % (ret,))
+        #return ret
+
+    @logexc
+    def readdir(self, path, offset):
+        return [ fuse.Direntry(d) for d in TahoeFuseBase.readdir(self, path, offset) ]
+
+class TahoeFuseShim(fuse.Fuse):
+    def __init__(self, trpc, *args, **kw):
+        log("TF: __init__(%r, %r)" % (args, kw))
+        self.trpc = trpc
+        fuse.Fuse.__init__(self, *args, **kw)
+
+    def log(self, msg):
+        log("<TFs> %s" % (msg, ))
+
+    @logexc
+    def readlink(self, path):
+        self.log("readlink(%r)" % (path,))
+        return self.trpc.call('readlink', path)
+
+    @logexc
+    def unlink(self, path):
+        self.log("unlink(%r)" % (path,))
+        return self.trpc.call('unlink', path)
+
+    @logexc
+    def rmdir(self, path):
+        self.log("rmdir(%r)" % (path,))
+        return self.trpc.call('unlink', path)
+
+    @logexc
+    def symlink(self, path, path1):
+        self.log("symlink(%r, %r)" % (path, path1))
+        return self.trpc.call('link', path, path1)
+
+    @logexc
+    def rename(self, path, path1):
+        self.log("rename(%r, %r)" % (path, path1))
+        return self.trpc.call('rename', path, path1)
+
+    @logexc
+    def link(self, path, path1):
+        self.log("link(%r, %r)" % (path, path1))
+        return self.trpc.call('link', path, path1)
+
+    @logexc
+    def chmod(self, path, mode):
+        self.log("XX chmod(%r, %r)" % (path, mode))
+        return self.trpc.call('chmod', path, mode)
+
+    @logexc
+    def chown(self, path, user, group):
+        self.log("XX chown(%r, %r, %r)" % (path, user, group))
+        return self.trpc.call('chown', path, user, group)
+
+    @logexc
+    def truncate(self, path, len):
+        self.log("XX truncate(%r, %r)" % (path, len))
+        return self.trpc.call('truncate', path, len)
+
+    @logexc
+    def utime(self, path, times):
+        self.log("XX utime(%r, %r)" % (path, times))
+        return self.trpc.call('utime', path, times)
+
+    @logexc
+    def statfs(self):
+        self.log("statfs()")
+        response = self.trpc.call('statfs')
+        #self.log("statfs(): %r" % (response,))
+        kwargs = dict([ (str(k),v) for k,v in response.items() ])
+        return fuse.StatVfs(**kwargs)
+
+    def fsinit(self):
+        self.log("fsinit()")
+
+    def main(self, *a, **kw):
+        self.log("main(%r, %r)" % (a, kw))
+
+        return fuse.Fuse.main(self, *a, **kw)
+
+    ##################################################################
+
+    @logexc
+    def readdir(self, path, offset):
+        self.log('readdir(%r, %r)' % (path, offset))
+        return [ fuse.Direntry(d) for d in self.trpc.call('readdir', path, offset) ]
+
+    @logexc
+    def getattr(self, path):
+        self.log('getattr(%r)' % (path,))
+        response = self.trpc.call('getattr', path)
+        kwargs = dict([ (str(k),v) for k,v in response.items() ])
+        s = TStat({}, **kwargs)
+        self.log('getattr(%r) -> %r' % (path, s))
+        return s
+
+    @logexc
+    def access(self, path, mode):
+        self.log("access(%r, %r)" % (path, mode))
+        return self.trpc.call('access', path, mode)
+
+    @logexc
+    def mkdir(self, path, mode):
+        self.log("mkdir(%r, %r)" % (path, mode))
+        return self.trpc.call('mkdir', path, mode)
+
+    ##################################################################
+    # file methods
 
-def launch_tahoe_fuse(tfs, argv):
+    def open(self, path, flags):
+        self.log('open(%r, %r)' % (path, flags, ))
+        return self.trpc.call('open', path, flags)
+
+    def create(self, path, flags, mode):
+        self.log('create(%r, %r, %r)' % (path, flags, mode))
+        return self.trpc.call('create', path, flags, mode)
+
+    ##
+
+    def read(self, path, size, offset):
+        self.log('read(%r, %r, %r)' % (path, size, offset, ))
+        return self.trpc.call('read', path, size, offset)
+
+    @logexc
+    def write(self, path, buf, offset):
+        self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
+        return self.trpc.call('write', path, buf, offset)
+
+    @logexc
+    def release(self, path, flags):
+        self.log("release(%r, %r)" % (path, flags,))
+        return self.trpc.call('release', path, flags)
+
+    @logexc
+    def fsync(self, path, isfsyncfile):
+        self.log("fsync(%r, %r)" % (path, isfsyncfile,))
+        return self.trpc.call('fsync', path, isfsyncfile)
+
+    @logexc
+    def flush(self, path):
+        self.log("flush(%r)" % (path,))
+        return self.trpc.call('flush', path)
+
+    @logexc
+    def fgetattr(self, path):
+        self.log("fgetattr(%r)" % (path,))
+        #return self.trpc.call('fgetattr', path)
+        response = self.trpc.call('fgetattr', path)
+        kwargs = dict([ (str(k),v) for k,v in response.items() ])
+        s = TStat({}, **kwargs)
+        self.log('getattr(%r) -> %r' % (path, s))
+        return s
+
+    @logexc
+    def ftruncate(self, path, len):
+        self.log("ftruncate(%r, %r)" % (path, len,))
+        return self.trpc.call('ftruncate', path, len)
+
+
+def launch_tahoe_fuse(tf_class, tobj, argv):
     sys.argv = ['tahoe fuse'] + list(argv)
     log('setting sys.argv=%r' % (sys.argv,))
     config = TahoeFuseOptions()
-    server = TahoeFuse(tfs, version="%prog " +VERSIONSTR+", fuse "+ fuse.__version__,
-                       usage=config.getSynopsis(),
-                       dash_s_do='setsingle')
+    version = "%prog " +VERSIONSTR+", fuse "+ fuse.__version__
+    server = tf_class(tobj, version=version, usage=config.getSynopsis(), dash_s_do='setsingle')
     server.parse(errex=1)
     server.main()
 
@@ -534,6 +721,16 @@ def fingerprint(uri):
         return None
     return base64.b32encode(sha.new(uri).digest()).lower()[:6]
 
+stat_fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
+                'st_atime', 'st_mtime', 'st_ctime', ]
+def stat_to_dict(statobj, fields=None):
+    if fields is None:
+        fields = stat_fields
+    d = {}
+    for f in fields:
+        d[f] = getattr(statobj, f, None)
+    return d
+
 class TStat(fuse.Stat):
     # in fuse 0.2, these are set by fuse.Stat.__init__
     # in fuse 0.2-pre3 (hardy) they are not. badness unsues if they're missing
@@ -563,10 +760,7 @@ class TStat(fuse.Stat):
         fuse.Stat.__init__(self, **kwargs)
 
     def __repr__(self):
-        d = {}
-        for f in self.fields:
-            d[f] = getattr(self, f, None)
-        return "<Stat%r>" % (d,)
+        return "<Stat%r>" % (stat_to_dict(self),)
 
 class Directory(object):
     def __init__(self, tfs, ro_uri, rw_uri):
@@ -594,7 +788,6 @@ class Directory(object):
 
     def load(self, name=None):
         now = time.time()
-        print 'loading', name or self
         log('%s.loading(%s)' % (self, name))
         url = self.tfs.compose_url("uri/%s?t=json", self.get_uri())
         data = urllib.urlopen(url).read()
@@ -916,18 +1109,259 @@ _tfs = None # to appease pyflakes; is set in main()
 def print_tree():
     log('tree:\n' + _tfs.pprint())
 
+
+def unmarshal(obj):
+    if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
+        return obj
+    elif isinstance(obj, unicode):
+        #log('unmarshal(%r)' % (obj,))
+        return base64.b64decode(obj)
+    elif isinstance(obj, list):
+        return map(unmarshal, obj)
+    elif isinstance(obj, dict):
+        return dict([ (k,unmarshal(v)) for k,v in obj.items() ])
+    else:
+        raise ValueError('object type not int,str,list,dict,none (%s) (%r)' % (type(obj), obj))
+
+def marshal(obj):
+    if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
+        return obj
+    elif isinstance(obj, str):
+        return base64.b64encode(obj)
+    elif isinstance(obj, list) or isinstance(obj, tuple):
+        return map(marshal, obj)
+    elif isinstance(obj, dict):
+        return dict([ (k,marshal(v)) for k,v in obj.items() ])
+    else:
+        raise ValueError('object type not int,str,list,dict,none (%s)' % type(obj))
+
+
+class TRPCProtocol(Protocol):
+    compute_response_sha1 = True
+    log_all_requests = False
+
+    def connectionMade(self):
+        self.buf = []
+
+    def dataReceived(self, data):
+        if data == 'keepalive\n':
+            log('keepalive connection on %r' % (self.transport,))
+            self.keepalive = True
+            return
+
+        if not data.endswith('\n'):
+            self.buf.append(data)
+            return
+        if self.buf:
+            self.buf.append(data)
+            reqstr = ''.join(self.buf)
+            self.buf = []
+            self.dispatch_request(reqstr)
+        else:
+            self.dispatch_request(data)
+
+    def dispatch_request(self, reqstr):
+        try:
+            req = simplejson.loads(reqstr)
+        except ValueError, ve:
+            log(ve)
+            return
+
+        d = defer.maybeDeferred(self.handle_request, req)
+        d.addCallback(self.send_response)
+        d.addErrback(self.send_error)
+
+    def send_error(self, failure):
+        log('failure: %s' % (failure,))
+        if failure.check(TFSIOError):
+            e = failure.value
+            self.send_response(['error', 'errno', e.args[0], e.args[1]])
+        else:
+            self.send_response(['error', 'failure', str(failure)])
+
+    def send_response(self, result):
+        response = simplejson.dumps(result)
+        header = { 'len': len(response), }
+        if self.compute_response_sha1:
+            header['sha1'] = base64.b64encode(sha.new(response).digest())
+        hdr = simplejson.dumps(header)
+        self.transport.write(hdr)
+        self.transport.write('\n')
+        self.transport.write(response)
+        self.transport.loseConnection()
+
+    def connectionLost(self, reason):
+        if hasattr(self, 'keepalive'):
+            log('keepalive connection %r lost, shutting down' % (self.transport,))
+            reactor.callLater(0, reactor.stop)
+
+    def handle_request(self, req):
+        if type(req) is not list or not req or len(req) < 1:
+            return ['error', 'malformed request']
+        if req[0] == 'call':
+            if len(req) < 3:
+                return ['error', 'malformed request']
+            methname = req[1]
+            try:
+                args = unmarshal(req[2])
+            except ValueError, ve:
+                return ['error', 'malformed arguments', str(ve)]
+
+            try:
+                meth = getattr(self.factory.server, methname)
+            except AttributeError, ae:
+                return ['error', 'no such method', str(ae)]
+
+            if self.log_all_requests:
+                log('call %s(%s)' % (methname, ', '.join(map(repr, args))))
+            try:
+                result = meth(*args)
+            except TFSIOError, e:
+                log('errno: %s; %s' % e.args)
+                return ['error', 'errno', e.args[0], e.args[1]]
+            except Exception, e:
+                log('exception: ' + traceback.format_exc())
+                return ['error', 'exception', str(e)]
+            d = defer.succeed(None)
+            d.addCallback(lambda junk: result) # result may be Deferred
+            d.addCallback(lambda res: ['result', marshal(res)]) # only applies if not errback
+            return d
+
+class TFSServer(object):
+    def __init__(self, socket_path, server=None):
+        self.socket_path = socket_path
+        log('TFSServer init socket: %s' % (socket_path,))
+
+        self.factory = Factory()
+        self.factory.protocol = TRPCProtocol
+        if server:
+            self.factory.server = server
+        else:
+            self.factory.server = self
+
+    def get_service(self):
+        if not hasattr(self, 'svc'):
+            from twisted.application import strports
+            self.svc = strports.service('unix:'+self.socket_path, self.factory)
+        return self.svc
+
+    def run(self):
+        svc = self.get_service()
+        def ss():
+            try:
+                svc.startService()
+            except:
+                reactor.callLater(0, reactor.stop)
+                raise
+        reactor.callLater(0, ss)
+        reactor.run()
+
+    def hello(self):
+        return 'pleased to meet you'
+
+    def echo(self, arg):
+        return arg
+
+    def failex(self):
+        raise ValueError('expected')
+
+    def fail(self):
+        return defer.maybeDeferred(self.failex)
+
+class RPCError(RuntimeError):
+    pass
+
+class TRPC(object):
+    def __init__(self, socket_fname):
+        self.socket_fname = socket_fname
+        self.keepalive = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        self.keepalive.connect(self.socket_fname)
+        self.keepalive.send('keepalive\n')
+        log('requested keepalive on %s' % (self.keepalive,))
+
+    def req(self, req):
+        # open conenction to trpc server
+        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        s.connect(self.socket_fname)
+        # send request
+        s.send(simplejson.dumps(req))
+        s.send('\n')
+        # read response header
+        hdr_data = s.recv(8192)
+        first_newline = hdr_data.index('\n')
+        header = hdr_data[:first_newline]
+        data = hdr_data[first_newline+1:]
+        hdr = simplejson.loads(header)
+        hdr_len = hdr['len']
+        if hdr.has_key('sha1'):
+            hdr_sha1 = base64.b64decode(hdr['sha1'])
+            spool = [data]
+            spool_sha = sha.new(data)
+            # spool response
+            while True:
+                data = s.recv(8192)
+                if data:
+                    spool.append(data)
+                    spool_sha.update(data)
+                else:
+                    break
+        else:
+            spool = [data]
+            # spool response
+            while True:
+                data = s.recv(8192)
+                if data:
+                    spool.append(data)
+                else:
+                    break
+        s.close()
+        # decode response
+        resp = ''.join(spool)
+        spool = None
+        assert hdr_len == len(resp), str((hdr_len, len(resp), repr(resp)))
+        if hdr.has_key('sha1'):
+            data_sha1 = spool_sha.digest()
+            spool = spool_sha = None
+            assert hdr_sha1 == data_sha1, str((base32.b2a(hdr_sha1), base32.b2a(data_sha1)))
+        #else:
+            #print 'warning, server provided no sha1 to check'
+        return resp
+
+    def call(self, methodname, *args):
+        res = self.req(['call', methodname, marshal(args)])
+
+        result = simplejson.loads(res)
+        if not result or len(result) < 2:
+            raise TypeError('malformed response %r' % (result,))
+        if result[0] == 'error':
+            if result[1] == 'errno':
+                raise TFSIOError(result[2], result[3])
+            else:
+                raise RPCError(*(result[1:])) # error, exception / error, failure
+        elif result[0] == 'result':
+            return unmarshal(result[1])
+        else:
+            raise TypeError('unknown response type %r' % (result[0],))
+
+    def shutdown(self):
+        log('shutdown() closing keepalive %s' % (self.keepalive,))
+        self.keepalive.close()
+
 def main(argv):
-    log("\n\nmain(%s)" % (argv,))
+    log("main(%s)" % (argv,))
 
+    # check for version or help options (no args == help)
     if not argv:
         argv = ['--help']
-    if len(argv) == 1 and argv[0] in ['-h', '--help', '--version']:
+    if len(argv) == 1 and argv[0] in ['-h', '--help']:
         config = TahoeFuseOptions()
         print >> sys.stderr, config
         print >> sys.stderr, 'fuse usage follows:'
-        launch_tahoe_fuse(None, argv)
+    if len(argv) == 1 and argv[0] in ['-h', '--help', '--version']:
+        launch_tahoe_fuse(TahoeFuseLocal, None, argv)
         return -2
 
+    # parse command line options
     config = TahoeFuseOptions()
     try:
         #print 'parsing', argv
@@ -937,6 +1371,7 @@ def main(argv):
         print e
         return -1
 
+    # check for which alias or uri is specified
     if config['alias']:
         alias = config['alias']
         #print 'looking for aliases in', config['node-directory']
@@ -944,9 +1379,10 @@ def main(argv):
         if alias not in aliases:
             raise usage.error('Alias %r not found' % (alias,))
         root_uri = aliases[alias]
+        root_name = alias
     elif config['root-uri']:
         root_uri = config['root-uri']
-        alias = 'root-uri'
+        root_name = 'uri_' + base32.b2a(tagged_hash('root_name', root_uri))[:12]
         # test the uri for structural validity:
         try:
             NewDirectoryURI.init_from_string(root_uri)
@@ -955,33 +1391,109 @@ def main(argv):
     else:
         raise usage.error('At least one of --alias or --root-uri must be specified')
 
-
     nodedir = config['node-directory']
     nodeurl = config['node-url']
     if not nodeurl:
         nodeurl = getnodeurl(nodedir)
 
-    # switch to named log file.
-    global logfile
-    fname = 'tfuse.%s.log' % (alias,)
-    log('switching to %s' % (fname,))
-    logfile.close()
-    logfile = file(fname, 'ab')
-    log('\n'+(24*'_')+'init'+(24*'_')+'\n')
+    # allocate socket
+    socket_dir = os.path.join(os.path.expanduser(nodedir), "tfuse.sockets")
+    socket_path = os.path.join(socket_dir, root_name)
+    if len(socket_path) > 103:
+        # try googling AF_UNIX and sun_len for some taste of why this oddity exists.
+        raise OSError(errno.ENAMETOOLONG, 'socket path too long (%s)' % (socket_path,))
+
+    fileutil.make_dirs(socket_dir, 0700)
+    if os.path.exists(socket_path):
+        log('socket exists')
+        if config['server-shutdown']:
+            log('calling shutdown')
+            trpc = TRPC(socket_path)
+            result = trpc.shutdown()
+            log('result: %r' % (result,))
+            log('called shutdown')
+            return
+        else:
+            raise OSError(errno.EEXIST, 'fuse already running (%r exists)' % (socket_path,))
+    elif config['server-shutdown']:
+        raise OSError(errno.ENOTCONN, '--server-shutdown specified, but server not running')
 
     if not os.path.exists(config.mountpoint):
-        raise OSError(2, 'No such file or directory: "%s"' % (config.mountpoint,))
-
-    cache_timeout = float(config['cache-timeout'])
-    tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
-    print tfs.pprint()
+        raise OSError(errno.ENOENT, 'No such file or directory: "%s"' % (config.mountpoint,))
 
-    # make tfs instance accesible to print_tree() for dbg
     global _tfs
-    _tfs = tfs
-
-    args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
-    launch_tahoe_fuse(tfs, args)
+    #
+    # Standalone ("no-split")
+    #
+    if config['no-split']:
+        reopen_logfile('tfuse.%s.unsplit.log' % (root_name,))
+        log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n')
+
+        cache_timeout = float(config['cache-timeout'])
+        tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
+        #print tfs.pprint()
+
+        # make tfs instance accesible to print_tree() for dbg
+        _tfs = tfs
+
+        args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
+        launch_tahoe_fuse(TahoeFuseLocal, tfs, args)
+
+    #
+    # Server
+    #
+    elif config['server']:
+        reopen_logfile('tfuse.%s.server.log' % (root_name,))
+        log('\n'+(24*'_')+'init (server)'+(24*'_')+'\n')
+
+        log('daemonizing')
+        from twisted.scripts._twistd_unix import daemonize
+        daemonize()
+
+        cache_timeout = float(config['cache-timeout'])
+        tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
+        #print tfs.pprint()
+
+        # make tfs instance accesible to print_tree() for dbg
+        _tfs = tfs
+
+        log('launching tfs server')
+        tfuse = TahoeFuseBase(tfs)
+        tfs_server = TFSServer(socket_path, tfuse)
+        tfs_server.run()
+        log('tfs server ran, exiting')
+
+    #
+    # Client
+    #
+    else:
+        reopen_logfile('tfuse.%s.client.log' % (root_name,))
+        log('\n'+(24*'_')+'init (client)'+(24*'_')+'\n')
+
+        server_args = [sys.executable, sys.argv[0], '--server'] + argv
+        if 'Allmydata.app/Contents/MacOS' in sys.executable:
+            # in this case blackmatch is the 'fuse' subcommand of the 'tahoe' executable
+            # otherwise we assume blackmatch is being run from source
+            server_args.insert(2, 'fuse')
+        #print 'launching server:', server_args
+        server = subprocess.Popen(server_args)
+        waiting_since = time.time()
+        wait_at_most = 8
+        while not os.path.exists(socket_path):
+            log('waiting for appearance of %r' % (socket_path,))
+            print 'waiting...'
+            time.sleep(1)
+            if time.time() - waiting_since > wait_at_most:
+                log('%r did not appear within %ss' % (socket_path, wait_at_most))
+                raise IOError(2, 'no socket %s' % (socket_path,))
+            print 'running...'
+        #print 'launched server'
+        trpc = TRPC(socket_path)
+
+
+        args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
+        launch_tahoe_fuse(TahoeFuseShim, trpc, args)
 
+        
 if __name__ == '__main__':
     sys.exit(main(sys.argv[1:]))
index b08d52be05285be111a0fe52b02f4c277d9e327c..23de5cfc770c7c3a80f9e4175f71f28c4896f8ee 100644 (file)
@@ -68,6 +68,12 @@ implementations = {
                                '--node-directory', '%(nodedir)s', '%(mountpath)s', ],
                    mount_wait=True,
                    suites=['read', 'write', ]),
+    'impl_c_no_split': dict(module=impl_c,
+                   mount_args=['--cache-timeout', '0', '--root-uri', '%(root-uri)s',
+                               '--no-split',
+                               '--node-directory', '%(nodedir)s', '%(mountpath)s', ],
+                   mount_wait=True,
+                   suites=['read', 'write', ]),
     }
 
 if sys.platform == 'darwin':