3 #-----------------------------------------------------------------------------------------------
4 from allmydata.uri import CHKFileURI, DirectoryURI, LiteralFileURI, is_literal_file_uri
5 from allmydata.scripts.common_http import do_http as do_http_req
6 from allmydata.util.hashutil import tagged_hash
7 from allmydata.util.assertutil import precondition
8 from allmydata.util import base32, fileutil, observer
9 from allmydata.scripts.common import get_aliases
11 from twisted.python import usage
12 from twisted.python.failure import Failure
13 from twisted.internet.protocol import Factory, Protocol
14 from twisted.internet import reactor, defer, task
15 from twisted.web import client
29 # one needs either python-fuse to have been installed in sys.path, or
30 # suitable affordances to be made in the build or runtime environment
40 USAGE = 'usage: tahoe fuse [dir_cap_name] [fuse_options] mountpoint'
41 DEFAULT_DIRECTORY_VALIDITY=26
43 if not hasattr(fuse, '__version__'):
45 "your fuse-py doesn't know of fuse.__version__, probably it's too old."
47 fuse.fuse_python_api = (0, 2)
48 fuse.feature_assert('stateful_files', 'has_init')
50 class TahoeFuseOptions(usage.Options):
52 ["node-directory", None, "~/.tahoe",
53 "Look here to find out which Tahoe node should be used for all "
54 "operations. The directory should either contain a full Tahoe node, "
55 "or a file named node.url which points to some other Tahoe node. "
56 "It should also contain a file named private/aliases which contains "
57 "the mapping from alias name to root dirnode URI."
59 ["node-url", None, None,
60 "URL of the tahoe node to use, a URL like \"http://127.0.0.1:3456\". "
61 "This overrides the URL found in the --node-directory ."],
63 "Which alias should be mounted."],
64 ["root-uri", None, None,
65 "Which root directory uri should be mounted."],
66 ["cache-timeout", None, 20,
67 "Time, in seconds, to cache directory data."],
71 'run stand-alone; no splitting into client and server'],
73 'server mode (should not be used by end users)'],
74 ['server-shutdown', None,
75 'shutdown server (should not be used by end users)'],
79 usage.Options.__init__(self)
80 self.fuse_options = []
81 self.mountpoint = None
83 def opt_option(self, fuse_option):
85 Pass mount options directly to fuse. See below.
87 self.fuse_options.append(fuse_option)
91 def parseArgs(self, mountpoint=''):
92 self.mountpoint = mountpoint
94 def getSynopsis(self):
95 return "%s [options] mountpoint" % (os.path.basename(sys.argv[0]),)
97 logfile = file('tfuse.log', 'ab')
99 def reopen_logfile(fname):
101 log('switching to %s' % (fname,))
103 logfile = file(fname, 'ab')
106 logfile.write("%s: %s\n" % (time.asctime(), msg))
112 def unicode_to_utf8_or_str(u):
113 if isinstance(u, unicode):
114 return u.encode('utf-8')
116 precondition(isinstance(u, str), repr(u))
119 def do_http(method, url, body=''):
120 resp = do_http_req(method, url, body)
121 log('do_http(%s, %s) -> %s, %s' % (method, url, resp.status, resp.reason))
122 if resp.status not in (200, 201):
123 raise RuntimeError('http response (%s, %s)' % (resp.status, resp.reason))
127 def flag2mode(flags):
128 log('flag2mode(%r)' % (flags,))
129 #md = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
130 md = {os.O_RDONLY: 'rb', os.O_WRONLY: 'wb', os.O_RDWR: 'w+b'}
131 m = md[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
133 if flags & os.O_APPEND:
134 m = m.replace('w', 'a', 1)
138 class TFSIOError(IOError):
141 class ENOENT(TFSIOError):
142 def __init__(self, msg):
143 TFSIOError.__init__(self, errno.ENOENT, msg)
145 class EINVAL(TFSIOError):
146 def __init__(self, msg):
147 TFSIOError.__init__(self, errno.EINVAL, msg)
149 class EACCESS(TFSIOError):
150 def __init__(self, msg):
151 TFSIOError.__init__(self, errno.EACCESS, msg)
153 class EEXIST(TFSIOError):
154 def __init__(self, msg):
155 TFSIOError.__init__(self, errno.EEXIST, msg)
157 class EIO(TFSIOError):
158 def __init__(self, msg):
159 TFSIOError.__init__(self, errno.EIO, msg)
161 def logargsretexc(meth):
162 def inner_logargsretexc(self, *args, **kwargs):
163 log("%s(%r, %r)" % (meth, args, kwargs))
165 ret = meth(self, *args, **kwargs)
167 log('exception:\n%s' % (traceback.format_exc(),))
169 log("ret: %r" % (ret, ))
171 inner_logargsretexc.__name__ = '<logwrap(%s)>' % (meth,)
172 return inner_logargsretexc
175 def inner_logexc(self, *args, **kwargs):
177 ret = meth(self, *args, **kwargs)
178 except TFSIOError, tie:
179 log('error: %s' % (tie,))
182 log('exception:\n%s' % (traceback.format_exc(),))
185 inner_logexc.__name__ = '<logwrap(%s)>' % (meth,)
189 log('exception:\n%s' % (traceback.format_exc(),))
191 def repr_mode(mode=None):
194 fields = ['S_ENFMT', 'S_IFBLK', 'S_IFCHR', 'S_IFDIR', 'S_IFIFO', 'S_IFLNK', 'S_IFREG', 'S_IFSOCK', 'S_IRGRP', 'S_IROTH', 'S_IRUSR', 'S_IRWXG', 'S_IRWXO', 'S_IRWXU', 'S_ISGID', 'S_ISUID', 'S_ISVTX', 'S_IWGRP', 'S_IWOTH', 'S_IWUSR', 'S_IXGRP', 'S_IXOTH', 'S_IXUSR']
197 fval = getattr(stat, field)
198 if (mode & fval) == fval:
202 def repr_flags(flags=None):
205 fields = [ 'O_APPEND', 'O_CREAT', 'O_DIRECT', 'O_DIRECTORY', 'O_EXCL', 'O_EXLOCK',
206 'O_LARGEFILE', 'O_NDELAY', 'O_NOCTTY', 'O_NOFOLLOW', 'O_NONBLOCK', 'O_RDWR',
207 'O_SHLOCK', 'O_SYNC', 'O_TRUNC', 'O_WRONLY', ]
210 fval = getattr(os, field, None)
211 if fval is not None and (flags & fval) == fval:
217 class DownloaderWithReadQueue(object):
220 self.dest_file_name = None
222 self.done_observer = observer.OneShotObserverList()
225 name = self.dest_file_name is None and '<none>' or os.path.basename(self.dest_file_name)
226 return "<DWRQ(%s)> q(%s)" % (name, len(self.read_heap or []))
229 log("%r: %s" % (self, msg))
232 def start(self, url, dest_file_name, target_size, interval=0.5):
233 self.log('start(%s, %s, %s)' % (url, dest_file_name, target_size, ))
234 self.dest_file_name = dest_file_name
235 file(self.dest_file_name, 'wb').close() # touch
236 self.target_size = target_size
238 self.loop = task.LoopingCall(self._check_file_size)
239 self.loop.start(interval)
241 d = client.downloadPage(url, self.dest_file_name)
242 d.addCallbacks(self.done, self.fail)
246 return self.done_observer.when_fired()
249 if os.path.exists(self.dest_file_name):
250 return os.path.getsize(self.dest_file_name)
255 def _read(self, posn, size):
256 #self.log('_read(%s, %s)' % (posn, size))
257 f = file(self.dest_file_name, 'rb')
264 def read(self, posn, size):
265 self.log('read(%s, %s)' % (posn, size))
266 if self.read_heap is None:
267 raise ValueError('read() called when already shut down')
268 if posn+size > self.target_size:
269 size -= self.target_size - posn
270 fsize = self.get_size()
271 if posn+size < fsize:
272 return defer.succeed(self._read(posn, size))
275 dread = (posn+size, posn, d)
276 heapq.heappush(self.read_heap, dread)
280 def _check_file_size(self):
281 #self.log('_check_file_size()')
284 size = self.get_size()
285 while self.read_heap and self.read_heap[0][0] <= size:
286 end, start, d = heapq.heappop(self.read_heap)
287 data = self._read(start, end-start)
294 def fail(self, failure):
295 self.log('fail(%s)' % (failure,))
297 if self.loop.running:
299 # fail any reads still pending
300 for end, start, d in self.read_heap:
301 reactor.callLater(0, d.errback, failure)
302 self.read_heap = None
303 self.done_observer.fire_if_not_fired(failure)
307 def done(self, result):
310 if self.loop.running:
312 precondition(self.get_size() == self.target_size, self.get_size(), self.target_size)
313 self._check_file_size() # process anything left pending in heap
314 precondition(not self.read_heap, self.read_heap, self.target_size, self.get_size())
315 self.read_heap = None
316 self.done_observer.fire_if_not_fired(self)
320 class TahoeFuseFile(object):
322 #def __init__(self, path, flags, *mode):
323 def __init__(self, tfs, path, flags, *mode):
324 log("TFF: __init__(%r, %r:%s, %r:%s)" % (path, flags, repr_flags(flags), mode, repr_mode(*mode)))
326 self.downloader = None
328 self._path = path # for tahoe put
330 self.parent, self.name, self.fnode = self.tfs.get_parent_name_and_child(path)
332 log('TFF: flags2(mode) -> %s' % (m,))
335 self.fname = self.tfs.cache.tmp_file(os.urandom(20))
336 if self.fnode is None:
337 log('TFF: [%s] open() for write: no file node, creating new File %s' % (self.name, self.fname, ))
338 self.fnode = File(0, LiteralFileURI.BASE_STRING)
339 self.fnode.tmp_fname = self.fname # XXX kill this
340 self.parent.add_child(self.name, self.fnode, {})
341 elif hasattr(self.fnode, 'tmp_fname'):
342 self.fname = self.fnode.tmp_fname
343 log('TFF: [%s] open() for write: existing file node lists %s' % (self.name, self.fname, ))
345 log('TFF: [%s] open() for write: existing file node lists no tmp_file, using new %s' % (self.name, self.fname, ))
347 log('TFF: [%s] changing mode %s(%s) to 0600' % (self.name, repr_mode(*mode), mode))
349 log('TFF: [%s] opening(%s) with flags %s(%s), mode %s(%s)' % (self.name, self.fname, repr_flags(flags|os.O_CREAT), flags|os.O_CREAT, repr_mode(*mode), mode))
350 #self.file = os.fdopen(os.open(self.fname, flags|os.O_CREAT, *mode), m)
351 self.file = os.fdopen(os.open(self.fname, flags|os.O_CREAT, *mode), m)
352 self.fd = self.file.fileno()
353 log('TFF: opened(%s) for write' % self.fname)
354 self.open_for_write = True
357 assert self.fnode is not None
358 uri = self.fnode.get_uri()
360 # XXX make this go away
361 if hasattr(self.fnode, 'tmp_fname'):
362 self.fname = self.fnode.tmp_fname
363 log('TFF: reopening(%s) for reading' % self.fname)
365 if is_literal_file_uri(uri) or not self.tfs.async:
366 log('TFF: synchronously fetching file from cache for reading')
367 self.fname = self.tfs.cache.get_file(uri)
369 log('TFF: asynchronously fetching file from cache for reading')
370 self.fname, self.downloader = self.tfs.cache.async_get_file(uri)
371 # downloader is None if the cache already contains the file
372 if self.downloader is not None:
373 d = self.downloader.when_done()
374 def download_complete(junk):
375 # once the download is complete, revert to non-async behaviour
376 self.downloader = None
377 d.addCallback(download_complete)
379 self.file = os.fdopen(os.open(self.fname, flags, *mode), m)
380 self.fd = self.file.fileno()
381 self.open_for_write = False
382 log('TFF: opened(%s) for read' % self.fname)
388 log("<TFF(%s:%s)> %s" % (os.path.basename(self.fname), self.name, msg))
391 def read(self, size, offset):
392 self.log('read(%r, %r)' % (size, offset, ))
394 # then we're busy doing an async download
395 # (and hence implicitly, we're in an environment that supports twisted)
396 #self.log('passing read() to %s' % (self.downloader, ))
397 d = self.downloader.read(offset, size)
399 raise EIO(str(failure))
403 self.log('servicing read() from %s' % (self.file, ))
404 self.file.seek(offset)
405 return self.file.read(size)
408 def write(self, buf, offset):
409 self.log("write(-%s-, %r)" % (len(buf), offset))
410 if not self.open_for_write:
412 self.file.seek(offset)
417 def release(self, flags):
418 self.log("release(%r)" % (flags,))
420 if self.open_for_write:
421 size = os.path.getsize(self.fname)
422 self.fnode.size = size
423 file_cap = self.tfs.upload(self.fname)
424 self.fnode.ro_uri = file_cap
425 # XXX [ ] TODO: set metadata
426 # write new uri into parent dir entry
427 self.parent.add_child(self.name, self.fnode, {})
428 self.log("uploaded: %s" % (file_cap,))
434 if 'w' in self.file.mode or 'a' in self.file.mode:
438 def fsync(self, isfsyncfile):
439 self.log("fsync(%r)" % (isfsyncfile,))
441 if isfsyncfile and hasattr(os, 'fdatasync'):
442 os.fdatasync(self.fd)
450 # cf. xmp_flush() in fusexmp_fh.c
451 os.close(os.dup(self.fd))
455 self.log("fgetattr()")
456 s = os.fstat(self.fd)
459 size = self.downloader.target_size
460 self.log("fgetattr() during async download, cache file: %s, size=%s" % (s, size))
462 self.log("fgetattr() -> %r" % (d,))
466 def ftruncate(self, len):
467 self.log("ftruncate(%r)" % (len,))
468 self.file.truncate(len)
470 class TahoeFuseBase(object):
472 def __init__(self, tfs):
473 log("TFB: __init__()")
478 log("<TFB> %s" % (msg, ))
481 def readlink(self, path):
482 self.log("readlink(%r)" % (path,))
483 node = self.tfs.get_path(path)
485 raise EINVAL('Not a symlink') # nothing in tahoe is a symlink
487 raise ENOENT('Invalid argument')
490 def unlink(self, path):
491 self.log("unlink(%r)" % (path,))
492 self.tfs.unlink(path)
495 def rmdir(self, path):
496 self.log("rmdir(%r)" % (path,))
497 self.tfs.unlink(path)
500 def symlink(self, path, path1):
501 self.log("symlink(%r, %r)" % (path, path1))
502 self.tfs.link(path, path1)
505 def rename(self, path, path1):
506 self.log("rename(%r, %r)" % (path, path1))
507 self.tfs.rename(path, path1)
510 def link(self, path, path1):
511 self.log("link(%r, %r)" % (path, path1))
512 self.tfs.link(path, path1)
515 def chmod(self, path, mode):
516 self.log("XX chmod(%r, %r)" % (path, mode))
517 #return -errno.EOPNOTSUPP
520 def chown(self, path, user, group):
521 self.log("XX chown(%r, %r, %r)" % (path, user, group))
522 #return -errno.EOPNOTSUPP
525 def truncate(self, path, len):
526 self.log("XX truncate(%r, %r)" % (path, len))
527 #return -errno.EOPNOTSUPP
530 def utime(self, path, times):
531 self.log("XX utime(%r, %r)" % (path, times))
532 #return -errno.EOPNOTSUPP
538 Should return an object with statvfs attributes (f_bsize, f_frsize...).
539 Eg., the return value of os.statvfs() is such a thing (since py 2.2).
540 If you are not reusing an existing statvfs object, start with
541 fuse.StatVFS(), and define the attributes.
543 To provide usable information (ie., you want sensible df(1)
544 output, you are suggested to specify the following attributes:
546 - f_bsize - preferred size of file blocks, in bytes
547 - f_frsize - fundamental size of file blcoks, in bytes
548 [if you have no idea, use the same as blocksize]
549 - f_blocks - total number of blocks in the filesystem
550 - f_bfree - number of free blocks
551 - f_files - total number of file inodes
552 - f_ffree - nunber of free file inodes
555 block_size = 4096 # 4k
556 preferred_block_size = 131072 # 128k, c.f. seg_size
557 fs_size = 8*2**40 # 8Tb
558 fs_free = 2*2**40 # 2Tb
560 #s = fuse.StatVfs(f_bsize = preferred_block_size,
561 s = dict(f_bsize = preferred_block_size,
562 f_frsize = block_size,
563 f_blocks = fs_size / block_size,
564 f_bfree = fs_free / block_size,
565 f_bavail = fs_free / block_size,
566 f_files = 2**30, # total files
567 f_ffree = 2**20, # available files
568 f_favail = 2**20, # available files (root)
569 f_flag = 2, # no suid
570 f_namemax = 255) # max name length
571 #self.log('statfs(): %r' % (s,))
577 ##################################################################
580 def readdir(self, path, offset):
581 self.log('readdir(%r, %r)' % (path, offset))
582 node = self.tfs.get_path(path)
585 dirlist = ['.', '..'] + node.children.keys()
586 self.log('dirlist = %r' % (dirlist,))
587 #return [fuse.Direntry(d) for d in dirlist]
591 def getattr(self, path):
592 self.log('getattr(%r)' % (path,))
595 # we don't have any metadata for the root (no edge leading to it)
596 mode = (stat.S_IFDIR | 755)
597 mtime = self.tfs.root.mtime
598 s = TStat({}, st_mode=mode, st_nlink=1, st_mtime=mtime)
599 self.log('getattr(%r) -> %r' % (path, s))
601 return stat_to_dict(s)
603 parent, name, child = self.tfs.get_parent_name_and_child(path)
604 if not child: # implicitly 'or not parent'
605 raise ENOENT('No such file or directory')
606 return stat_to_dict(parent.get_stat(name))
609 def access(self, path, mode):
610 self.log("access(%r, %r)" % (path, mode))
611 node = self.tfs.get_path(path)
614 accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
616 if not node.writable():
617 log('write access denied for %s (req:%o)' % (path, mode, ))
620 #log('access granted for %s' % (path, ))
623 def mkdir(self, path, mode):
624 self.log("mkdir(%r, %r)" % (path, mode))
627 ##################################################################
630 def open(self, path, flags):
631 self.log('open(%r, %r)' % (path, flags, ))
632 if path in self.files:
633 # XXX todo [ ] should consider concurrent open files of differing modes
636 tffobj = TahoeFuseFile(self.tfs, path, flags)
637 self.files[path] = tffobj
639 def create(self, path, flags, mode):
640 self.log('create(%r, %r, %r)' % (path, flags, mode))
641 if path in self.files:
642 # XXX todo [ ] should consider concurrent open files of differing modes
645 tffobj = TahoeFuseFile(self.tfs, path, flags, mode)
646 self.files[path] = tffobj
648 def _get_file(self, path):
649 if not path in self.files:
650 raise ENOENT('No such file or directory: %s' % (path,))
651 return self.files[path]
655 def read(self, path, size, offset):
656 self.log('read(%r, %r, %r)' % (path, size, offset, ))
657 return self._get_file(path).read(size, offset)
660 def write(self, path, buf, offset):
661 self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
662 return self._get_file(path).write(buf, offset)
665 def release(self, path, flags):
666 self.log("release(%r, %r)" % (path, flags,))
667 self._get_file(path).release(flags)
671 def fsync(self, path, isfsyncfile):
672 self.log("fsync(%r, %r)" % (path, isfsyncfile,))
673 return self._get_file(path).fsync(isfsyncfile)
676 def flush(self, path):
677 self.log("flush(%r)" % (path,))
678 return self._get_file(path).flush()
681 def fgetattr(self, path):
682 self.log("fgetattr(%r)" % (path,))
683 return self._get_file(path).fgetattr()
686 def ftruncate(self, path, len):
687 self.log("ftruncate(%r, %r)" % (path, len,))
688 return self._get_file(path).ftruncate(len)
690 class TahoeFuseLocal(TahoeFuseBase, fuse.Fuse):
691 def __init__(self, tfs, *args, **kw):
692 log("TFL: __init__(%r, %r)" % (args, kw))
693 TahoeFuseBase.__init__(self, tfs)
694 fuse.Fuse.__init__(self, *args, **kw)
697 log("<TFL> %s" % (msg, ))
699 def main(self, *a, **kw):
700 self.log("main(%r, %r)" % (a, kw))
701 return fuse.Fuse.main(self, *a, **kw)
703 # overrides for those methods which return objects not marshalled
704 def fgetattr(self, path):
705 return TStat({}, **(TahoeFuseBase.fgetattr(self, path)))
707 def getattr(self, path):
708 return TStat({}, **(TahoeFuseBase.getattr(self, path)))
711 return fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
712 #self.log('statfs()')
713 #ret = fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
714 #self.log('statfs(): %r' % (ret,))
718 def readdir(self, path, offset):
719 return [ fuse.Direntry(d) for d in TahoeFuseBase.readdir(self, path, offset) ]
721 class TahoeFuseShim(fuse.Fuse):
722 def __init__(self, trpc, *args, **kw):
723 log("TF: __init__(%r, %r)" % (args, kw))
725 fuse.Fuse.__init__(self, *args, **kw)
728 log("<TFs> %s" % (msg, ))
731 def readlink(self, path):
732 self.log("readlink(%r)" % (path,))
733 return self.trpc.call('readlink', path)
736 def unlink(self, path):
737 self.log("unlink(%r)" % (path,))
738 return self.trpc.call('unlink', path)
741 def rmdir(self, path):
742 self.log("rmdir(%r)" % (path,))
743 return self.trpc.call('unlink', path)
746 def symlink(self, path, path1):
747 self.log("symlink(%r, %r)" % (path, path1))
748 return self.trpc.call('link', path, path1)
751 def rename(self, path, path1):
752 self.log("rename(%r, %r)" % (path, path1))
753 return self.trpc.call('rename', path, path1)
756 def link(self, path, path1):
757 self.log("link(%r, %r)" % (path, path1))
758 return self.trpc.call('link', path, path1)
761 def chmod(self, path, mode):
762 self.log("XX chmod(%r, %r)" % (path, mode))
763 return self.trpc.call('chmod', path, mode)
766 def chown(self, path, user, group):
767 self.log("XX chown(%r, %r, %r)" % (path, user, group))
768 return self.trpc.call('chown', path, user, group)
771 def truncate(self, path, len):
772 self.log("XX truncate(%r, %r)" % (path, len))
773 return self.trpc.call('truncate', path, len)
776 def utime(self, path, times):
777 self.log("XX utime(%r, %r)" % (path, times))
778 return self.trpc.call('utime', path, times)
783 response = self.trpc.call('statfs')
784 #self.log("statfs(): %r" % (response,))
785 kwargs = dict([ (str(k),v) for k,v in response.items() ])
786 return fuse.StatVfs(**kwargs)
791 def main(self, *a, **kw):
792 self.log("main(%r, %r)" % (a, kw))
794 return fuse.Fuse.main(self, *a, **kw)
796 ##################################################################
799 def readdir(self, path, offset):
800 self.log('readdir(%r, %r)' % (path, offset))
801 return [ fuse.Direntry(d) for d in self.trpc.call('readdir', path, offset) ]
804 def getattr(self, path):
805 self.log('getattr(%r)' % (path,))
806 response = self.trpc.call('getattr', path)
807 kwargs = dict([ (str(k),v) for k,v in response.items() ])
808 s = TStat({}, **kwargs)
809 self.log('getattr(%r) -> %r' % (path, s))
813 def access(self, path, mode):
814 self.log("access(%r, %r)" % (path, mode))
815 return self.trpc.call('access', path, mode)
818 def mkdir(self, path, mode):
819 self.log("mkdir(%r, %r)" % (path, mode))
820 return self.trpc.call('mkdir', path, mode)
822 ##################################################################
825 def open(self, path, flags):
826 self.log('open(%r, %r)' % (path, flags, ))
827 return self.trpc.call('open', path, flags)
829 def create(self, path, flags, mode):
830 self.log('create(%r, %r, %r)' % (path, flags, mode))
831 return self.trpc.call('create', path, flags, mode)
835 def read(self, path, size, offset):
836 self.log('read(%r, %r, %r)' % (path, size, offset, ))
837 return self.trpc.call('read', path, size, offset)
840 def write(self, path, buf, offset):
841 self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
842 return self.trpc.call('write', path, buf, offset)
845 def release(self, path, flags):
846 self.log("release(%r, %r)" % (path, flags,))
847 return self.trpc.call('release', path, flags)
850 def fsync(self, path, isfsyncfile):
851 self.log("fsync(%r, %r)" % (path, isfsyncfile,))
852 return self.trpc.call('fsync', path, isfsyncfile)
855 def flush(self, path):
856 self.log("flush(%r)" % (path,))
857 return self.trpc.call('flush', path)
860 def fgetattr(self, path):
861 self.log("fgetattr(%r)" % (path,))
862 #return self.trpc.call('fgetattr', path)
863 response = self.trpc.call('fgetattr', path)
864 kwargs = dict([ (str(k),v) for k,v in response.items() ])
865 s = TStat({}, **kwargs)
866 self.log('getattr(%r) -> %r' % (path, s))
870 def ftruncate(self, path, len):
871 self.log("ftruncate(%r, %r)" % (path, len,))
872 return self.trpc.call('ftruncate', path, len)
875 def launch_tahoe_fuse(tf_class, tobj, argv):
876 sys.argv = ['tahoe fuse'] + list(argv)
877 log('setting sys.argv=%r' % (sys.argv,))
878 config = TahoeFuseOptions()
879 version = "%prog " +VERSIONSTR+", fuse "+ fuse.__version__
880 server = tf_class(tobj, version=version, usage=config.getSynopsis(), dash_s_do='setsingle')
881 server.parse(errex=1)
884 def getnodeurl(nodedir):
885 f = file(os.path.expanduser(os.path.join(nodedir, "node.url")), 'rb')
886 nu = f.read().strip()
892 def fingerprint(uri):
895 return base64.b32encode(sha.new(uri).digest()).lower()[:6]
897 stat_fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
898 'st_atime', 'st_mtime', 'st_ctime', ]
899 def stat_to_dict(statobj, fields=None):
904 d[f] = getattr(statobj, f, None)
907 class TStat(fuse.Stat):
908 # in fuse 0.2, these are set by fuse.Stat.__init__
909 # in fuse 0.2-pre3 (hardy) they are not. badness ensues if they're missing
921 fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
922 'st_atime', 'st_mtime', 'st_ctime', ]
923 def __init__(self, metadata, **kwargs):
924 # first load any stat fields present in 'metadata'
925 for st in [ 'mtime', 'ctime' ]:
927 setattr(self, "st_%s" % st, metadata[st])
928 for st in self.fields:
930 setattr(self, st, metadata[st])
932 # then set any values passed in as kwargs
933 fuse.Stat.__init__(self, **kwargs)
936 return "<Stat%r>" % (stat_to_dict(self),)
938 class Directory(object):
939 def __init__(self, tfs, ro_uri, rw_uri):
943 assert (rw_uri or ro_uri)
945 self.last_load = None
946 self.last_data = None
950 return "<Directory %s>" % (fingerprint(self.get_uri()),)
952 def maybe_refresh(self, name=None):
954 if the previously cached data was retrieved within the cache
955 validity period, does nothing. otherwise refetches the data
956 for this directory and reloads itself
959 if self.last_load is None or (now - self.last_load) > self.tfs.cache_validity:
962 def load(self, name=None):
964 log('%s.loading(%s)' % (self, name))
965 url = self.tfs.compose_url("uri/%s?t=json", self.get_uri())
966 data = urllib.urlopen(url).read()
967 h = tagged_hash('cache_hash', data)
968 if h == self.last_data:
970 log('%s.load() : no change h(data)=%s' % (self, base32.b2a(h), ))
973 parsed = simplejson.loads(data)
975 log('%s.load(): unable to parse json data for dir:\n%r' % (self, data))
978 assert nodetype == 'dirnode'
979 self.children.clear()
980 for cname,details in d['children'].items():
981 cname = unicode_to_utf8_or_str(cname)
982 ctype, cattrs = details
983 metadata = cattrs.get('metadata', {})
984 if ctype == 'dirnode':
985 cobj = self.tfs.dir_for(cname, cattrs.get('ro_uri'), cattrs.get('rw_uri'))
987 assert ctype == "filenode"
988 cobj = File(cattrs.get('size'), cattrs.get('ro_uri'))
989 self.children[cname] = cobj, metadata
993 log('%s.load() loaded: \n%s' % (self, self.pprint(),))
995 def get_children(self):
996 return self.children.keys()
998 def get_child(self, name):
999 return self.children[name][0]
1001 def add_child(self, name, child, metadata):
1002 log('%s.add_child(%r, %r, %r)' % (self, name, child, metadata, ))
1003 self.children[name] = child, metadata
1004 url = self.tfs.compose_url("uri/%s/%s?t=uri", self.get_uri(), name)
1005 child_cap = do_http('PUT', url, child.get_uri())
1006 # XXX [ ] TODO: push metadata to tahoe node
1007 assert child_cap == child.get_uri()
1008 self.mtime = time.time()
1009 log('added child %r with %r to %r' % (name, child_cap, self))
1011 def remove_child(self, name):
1012 log('%s.remove_child(%r)' % (self, name, ))
1013 del self.children[name]
1014 url = self.tfs.compose_url("uri/%s/%s", self.get_uri(), name)
1015 resp = do_http('DELETE', url)
1016 self.mtime = time.time()
1017 log('child (%s) removal yielded %r' % (name, resp,))
1020 return self.rw_uri or self.ro_uri
1022 # TODO: rename to 'is_writeable', or switch sense to 'is_readonly', for consistency with Tahoe code
1024 return self.rw_uri and self.rw_uri != self.ro_uri
1026 def pprint(self, prefix='', printed=None, suffix=''):
1030 writable = self.writable() and '+' or ' '
1032 ret.append(" %s/%s ... <%s> : %s" % (prefix, writable, fingerprint(self.get_uri()), suffix, ))
1034 ret.append("[%s] %s/%s : %s" % (fingerprint(self.get_uri()), prefix, writable, suffix, ))
1036 for name,(child,metadata) in sorted(self.children.items()):
1037 ret.append(child.pprint(' ' * (len(prefix)+1)+name, printed, repr(metadata)))
1038 return '\n'.join(ret)
1040 def get_metadata(self, name):
1041 return self.children[name][1]
1043 def get_stat(self, name):
1044 child,metadata = self.children[name]
1045 log("%s.get_stat(%s) md: %r" % (self, name, metadata))
1047 if isinstance(child, Directory):
1048 child.maybe_refresh(name)
1049 mode = metadata.get('st_mode') or (stat.S_IFDIR | 0755)
1050 s = TStat(metadata, st_mode=mode, st_nlink=1, st_mtime=child.mtime)
1052 if hasattr(child, 'tmp_fname'):
1053 s = os.stat(child.tmp_fname)
1054 log("%s.get_stat(%s) returning local stat of tmp file" % (self, name, ))
1058 st_size = child.size,
1059 st_mode = metadata.get('st_mode') or (stat.S_IFREG | 0444),
1060 st_mtime = metadata.get('mtime') or self.mtime,
1064 log("%s.get_stat(%s)->%s" % (self, name, s))
1068 def __init__(self, size, ro_uri):
1071 ro_uri = str(ro_uri)
1072 self.ro_uri = ro_uri
1075 return "<File %s>" % (fingerprint(self.ro_uri) or [self.tmp_fname],)
1077 def pprint(self, prefix='', printed=None, suffix=''):
1078 return " %s (%s) : %s" % (prefix, self.size, suffix, )
1087 def __init__(self, nodedir, nodeurl, root_uri,
1088 cache_validity_period=DEFAULT_DIRECTORY_VALIDITY, async=False):
1089 self.cache_validity = cache_validity_period
1090 self.nodeurl = nodeurl
1091 self.root_uri = root_uri
1095 cachedir = os.path.expanduser(os.path.join(nodedir, '_cache'))
1096 self.cache = FileCache(nodeurl, cachedir)
1097 ro_uri = DirectoryURI.init_from_string(self.root_uri).get_readonly()
1098 self.root = Directory(self, ro_uri, self.root_uri)
1099 self.root.maybe_refresh('<root>')
1102 log("<TFS> %s" % (msg, ))
1105 return self.root.pprint()
1107 def compose_url(self, fmt, *args):
1108 return self.nodeurl + (fmt % tuple(map(urllib.quote, args)))
1110 def get_parent_name_and_child(self, path):
1112 find the parent dir node, name of child relative to that parent, and
1113 child node within the TFS object space.
1114 @returns: (parent, name, child) if the child is found
1115 (parent, name, None) if the child is missing from the parent
1116 (None, name, None) if the parent is not found
1120 dirname, name = os.path.split(path)
1121 parent = self.get_path(dirname)
1124 child = parent.get_child(name)
1125 return parent, name, child
1127 return parent, name, None
1129 return None, name, None
1131 def get_path(self, path):
1132 comps = path.strip('/').split('/')
1138 if not isinstance(cursor, Directory):
1139 self.log('path "%s" is not a dir' % (path,))
1141 cursor.maybe_refresh(c_name)
1143 cursor = cursor.get_child(comp)
1146 self.log('path "%s" not found' % (path,))
1148 if isinstance(cursor, Directory):
1149 cursor.maybe_refresh(c_name)
1152 def dir_for(self, name, ro_uri, rw_uri):
1153 #self.log('dir_for(%s) [%s/%s]' % (name, fingerprint(ro_uri), fingerprint(rw_uri)))
1155 ro_uri = str(ro_uri)
1157 rw_uri = str(rw_uri)
1158 uri = rw_uri or ro_uri
1160 dirobj = self.dirs.get(uri)
1162 self.log('dir_for(%s) creating new Directory' % (name, ))
1163 dirobj = Directory(self, ro_uri, rw_uri)
1164 self.dirs[uri] = dirobj
1167 def upload(self, fname):
1168 self.log('upload(%r)' % (fname,))
1169 fh = file(fname, 'rb')
1170 url = self.compose_url("uri")
1171 file_cap = do_http('PUT', url, fh)
1172 self.log('uploaded to: %r' % (file_cap,))
1175 def mkdir(self, path):
1176 self.log('mkdir(%r)' % (path,))
1177 parent, name, child = self.get_parent_name_and_child(path)
1180 raise EEXIST('File exists: %s' % (name,))
1182 raise ENOENT('No such file or directory: %s' % (path,))
1184 url = self.compose_url("uri?t=mkdir")
1185 new_dir_cap = do_http('PUT', url)
1187 ro_uri = DirectoryURI.init_from_string(new_dir_cap).get_readonly()
1188 child = Directory(self, ro_uri, new_dir_cap)
1189 parent.add_child(name, child, {})
1191 def rename(self, path, path1):
1192 self.log('rename(%s, %s)' % (path, path1))
1193 src_parent, src_name, src_child = self.get_parent_name_and_child(path)
1194 dst_parent, dst_name, dst_child = self.get_parent_name_and_child(path1)
1196 if not src_child or not dst_parent:
1197 raise ENOENT('No such file or directory')
1199 dst_parent.add_child(dst_name, src_child, {})
1200 src_parent.remove_child(src_name)
1202 def unlink(self, path):
1203 parent, name, child = self.get_parent_name_and_child(path)
1205 if child is None: # parent or child is missing
1206 raise ENOENT('No such file or directory')
1207 if not parent.writable():
1208 raise EACCESS('Permission denied')
1210 parent.remove_child(name)
1212 def link(self, path, path1):
1213 src = self.get_path(path)
1214 dst_parent, dst_name, dst_child = self.get_parent_name_and_child(path1)
1217 raise ENOENT('No such file or directory')
1218 if dst_parent is None:
1219 raise ENOENT('No such file or directory')
1220 if not dst_parent.writable():
1221 raise EACCESS('Permission denied')
1223 dst_parent.add_child(dst_name, src, {})
1225 class FileCache(object):
1226 def __init__(self, nodeurl, cachedir):
1227 self.nodeurl = nodeurl
1228 self.cachedir = cachedir
1229 if not os.path.exists(self.cachedir):
1230 os.makedirs(self.cachedir)
1231 self.tmpdir = os.path.join(self.cachedir, 'tmp')
1232 if not os.path.exists(self.tmpdir):
1233 os.makedirs(self.tmpdir)
1234 self.downloaders = weakref.WeakValueDictionary()
1237 log("<FC> %s" % (msg, ))
1239 def get_file(self, uri):
1240 self.log('get_file(%s)' % (uri,))
1241 if is_literal_file_uri(uri):
1242 return self.get_literal(uri)
1244 return self.get_chk(uri, async=False)
1246 def async_get_file(self, uri):
1247 self.log('get_file(%s)' % (uri,))
1248 return self.get_chk(uri, async=True)
1250 def get_literal(self, uri):
1251 h = sha.new(uri).digest()
1252 u = LiteralFileURI.init_from_string(uri)
1253 fname = os.path.join(self.cachedir, '__'+base64.b32encode(h).lower())
1255 self.log('writing literal file %s (%s)' % (fname, size, ))
1256 fh = open(fname, 'wb')
1261 def get_chk(self, uri, async=False):
1262 u = CHKFileURI.init_from_string(str(uri))
1263 storage_index = u.storage_index
1265 fname = os.path.join(self.cachedir, base64.b32encode(storage_index).lower())
1266 if os.path.exists(fname):
1267 fsize = os.path.getsize(fname)
1274 self.log('warning file "%s" is too short %s < %s' % (fname, fsize, size))
1275 self.log('downloading file %s (%s)' % (fname, size, ))
1276 url = "%suri/%s" % (self.nodeurl, uri)
1278 if fname in self.downloaders and self.downloaders[fname].running:
1279 downloader = self.downloaders[fname]
1281 downloader = DownloaderWithReadQueue()
1282 self.downloaders[fname] = downloader
1283 d = downloader.start(url, fname, target_size=u.size)
1284 def clear_downloader(result, fname):
1285 self.log('clearing %s from downloaders: %r' % (fname, result))
1286 self.downloaders.pop(fname, None)
1287 d.addBoth(clear_downloader, fname)
1288 return fname, downloader
1290 fh = open(fname, 'wb')
1291 download = urllib.urlopen(url)
1293 chunk = download.read(4096)
1300 def tmp_file(self, id):
1301 fname = os.path.join(self.tmpdir, base64.b32encode(id).lower())
1304 _tfs = None # to appease pyflakes; is set in main()
1306 log('tree:\n' + _tfs.pprint())
1310 if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
1312 elif isinstance(obj, unicode) or isinstance(obj, str):
1313 #log('unmarshal(%r)' % (obj,))
1314 return base64.b64decode(obj)
1315 elif isinstance(obj, list):
1316 return map(unmarshal, obj)
1317 elif isinstance(obj, dict):
1318 return dict([ (k,unmarshal(v)) for k,v in obj.items() ])
1320 raise ValueError('object type not int,str,list,dict,none (%s) (%r)' % (type(obj), obj))
1323 if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
1325 elif isinstance(obj, str):
1326 return base64.b64encode(obj)
1327 elif isinstance(obj, list) or isinstance(obj, tuple):
1328 return map(marshal, obj)
1329 elif isinstance(obj, dict):
1330 return dict([ (k,marshal(v)) for k,v in obj.items() ])
1332 raise ValueError('object type not int,str,list,dict,none (%s)' % type(obj))
1335 class TRPCProtocol(Protocol):
1336 compute_response_sha1 = True
1337 log_all_requests = False
1339 def connectionMade(self):
1342 def dataReceived(self, data):
1343 if data == 'keepalive\n':
1344 log('keepalive connection on %r' % (self.transport,))
1345 self.keepalive = True
1348 if not data.endswith('\n'):
1349 self.buf.append(data)
1352 self.buf.append(data)
1353 reqstr = ''.join(self.buf)
1355 self.dispatch_request(reqstr)
1357 self.dispatch_request(data)
1359 def dispatch_request(self, reqstr):
1361 req = simplejson.loads(reqstr)
1362 except ValueError, ve:
1366 d = defer.maybeDeferred(self.handle_request, req)
1367 d.addCallback(self.send_response)
1368 d.addErrback(self.send_error)
1370 def send_error(self, failure):
1371 log('failure: %s' % (failure,))
1372 if failure.check(TFSIOError):
1374 self.send_response(['error', 'errno', e.args[0], e.args[1]])
1376 self.send_response(['error', 'failure', str(failure)])
1378 def send_response(self, result):
1379 response = simplejson.dumps(result)
1380 header = { 'len': len(response), }
1381 if self.compute_response_sha1:
1382 header['sha1'] = base64.b64encode(sha.new(response).digest())
1383 hdr = simplejson.dumps(header)
1384 self.transport.write(hdr)
1385 self.transport.write('\n')
1386 self.transport.write(response)
1387 self.transport.loseConnection()
1389 def connectionLost(self, reason):
1390 if hasattr(self, 'keepalive'):
1391 log('keepalive connection %r lost, shutting down' % (self.transport,))
1392 reactor.callLater(0, reactor.stop)
1394 def handle_request(self, req):
1395 if type(req) is not list or not req or len(req) < 1:
1396 return ['error', 'malformed request']
1397 if req[0] == 'call':
1399 return ['error', 'malformed request']
1402 args = unmarshal(req[2])
1403 except ValueError, ve:
1404 return ['error', 'malformed arguments', str(ve)]
1407 meth = getattr(self.factory.server, methname)
1408 except AttributeError, ae:
1409 return ['error', 'no such method', str(ae)]
1411 if self.log_all_requests:
1412 log('call %s(%s)' % (methname, ', '.join(map(repr, args))))
1414 result = meth(*args)
1415 except TFSIOError, e:
1416 log('errno: %s; %s' % e.args)
1417 return ['error', 'errno', e.args[0], e.args[1]]
1418 except Exception, e:
1419 log('exception: ' + traceback.format_exc())
1420 return ['error', 'exception', str(e)]
1421 d = defer.succeed(None)
1422 d.addCallback(lambda junk: result) # result may be Deferred
1423 d.addCallback(lambda res: ['result', marshal(res)]) # only applies if not errback
1426 class TFSServer(object):
1427 def __init__(self, socket_path, server=None):
1428 self.socket_path = socket_path
1429 log('TFSServer init socket: %s' % (socket_path,))
1431 self.factory = Factory()
1432 self.factory.protocol = TRPCProtocol
1434 self.factory.server = server
1436 self.factory.server = self
1438 def get_service(self):
1439 if not hasattr(self, 'svc'):
1440 from twisted.application import strports
1441 self.svc = strports.service('unix:'+self.socket_path, self.factory)
1445 svc = self.get_service()
1450 reactor.callLater(0, reactor.stop)
1452 reactor.callLater(0, ss)
1456 return 'pleased to meet you'
1458 def echo(self, arg):
1462 raise ValueError('expected')
1465 return defer.maybeDeferred(self.failex)
1467 class RPCError(RuntimeError):
1471 def __init__(self, socket_fname):
1472 self.socket_fname = socket_fname
1473 self.keepalive = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1474 self.keepalive.connect(self.socket_fname)
1475 self.keepalive.send('keepalive\n')
1476 log('requested keepalive on %s' % (self.keepalive,))
1479 # open conenction to trpc server
1480 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1481 s.connect(self.socket_fname)
1483 s.send(simplejson.dumps(req))
1485 # read response header
1486 hdr_data = s.recv(8192)
1487 first_newline = hdr_data.index('\n')
1488 header = hdr_data[:first_newline]
1489 data = hdr_data[first_newline+1:]
1490 hdr = simplejson.loads(header)
1491 hdr_len = hdr['len']
1492 if hdr.has_key('sha1'):
1493 hdr_sha1 = base64.b64decode(hdr['sha1'])
1495 spool_sha = sha.new(data)
1501 spool_sha.update(data)
1515 resp = ''.join(spool)
1517 assert hdr_len == len(resp), str((hdr_len, len(resp), repr(resp)))
1518 if hdr.has_key('sha1'):
1519 data_sha1 = spool_sha.digest()
1520 spool = spool_sha = None
1521 assert hdr_sha1 == data_sha1, str((base32.b2a(hdr_sha1), base32.b2a(data_sha1)))
1523 #print 'warning, server provided no sha1 to check'
1526 def call(self, methodname, *args):
1527 res = self.req(['call', methodname, marshal(args)])
1529 result = simplejson.loads(res)
1530 if not result or len(result) < 2:
1531 raise TypeError('malformed response %r' % (result,))
1532 if result[0] == 'error':
1533 if result[1] == 'errno':
1534 raise TFSIOError(result[2], result[3])
1536 raise RPCError(*(result[1:])) # error, exception / error, failure
1537 elif result[0] == 'result':
1538 return unmarshal(result[1])
1540 raise TypeError('unknown response type %r' % (result[0],))
1543 log('shutdown() closing keepalive %s' % (self.keepalive,))
1544 self.keepalive.close()
1546 # (cut-n-pasted here due to an ImportError / some py2app linkage issues)
1547 #from twisted.scripts._twistd_unix import daemonize
1549 # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
1550 if os.fork(): # launch child and...
1551 os._exit(0) # kill off parent
1553 if os.fork(): # launch child and...
1554 os._exit(0) # kill off parent again.
1556 null=os.open('/dev/null', os.O_RDWR)
1561 if e.errno != errno.EBADF:
1566 log("main(%s)" % (argv,))
1568 # check for version or help options (no args == help)
1571 if len(argv) == 1 and argv[0] in ['-h', '--help']:
1572 config = TahoeFuseOptions()
1573 print >> sys.stderr, config
1574 print >> sys.stderr, 'fuse usage follows:'
1575 if len(argv) == 1 and argv[0] in ['-h', '--help', '--version']:
1576 launch_tahoe_fuse(TahoeFuseLocal, None, argv)
1579 # parse command line options
1580 config = TahoeFuseOptions()
1582 #print 'parsing', argv
1583 config.parseOptions(argv)
1584 except usage.error, e:
1589 # check for which alias or uri is specified
1591 alias = config['alias']
1592 #print 'looking for aliases in', config['node-directory']
1593 aliases = get_aliases(os.path.expanduser(config['node-directory']))
1594 if alias not in aliases:
1595 raise usage.error('Alias %r not found' % (alias,))
1596 root_uri = aliases[alias]
1598 elif config['root-uri']:
1599 root_uri = config['root-uri']
1600 root_name = 'uri_' + base32.b2a(tagged_hash('root_name', root_uri))[:12]
1601 # test the uri for structural validity:
1603 DirectoryURI.init_from_string(root_uri)
1605 raise usage.error('root-uri must be a valid directory uri (not %r)' % (root_uri,))
1607 raise usage.error('At least one of --alias or --root-uri must be specified')
1609 nodedir = config['node-directory']
1610 nodeurl = config['node-url']
1612 nodeurl = getnodeurl(nodedir)
1615 socket_dir = os.path.join(os.path.expanduser(nodedir), "tfuse.sockets")
1616 socket_path = os.path.join(socket_dir, root_name)
1617 if len(socket_path) > 103:
1618 # try googling AF_UNIX and sun_len for some taste of why this oddity exists.
1619 raise OSError(errno.ENAMETOOLONG, 'socket path too long (%s)' % (socket_path,))
1621 fileutil.make_dirs(socket_dir, 0700)
1622 if os.path.exists(socket_path):
1623 log('socket exists')
1624 if config['server-shutdown']:
1625 log('calling shutdown')
1626 trpc = TRPC(socket_path)
1627 result = trpc.shutdown()
1628 log('result: %r' % (result,))
1629 log('called shutdown')
1632 raise OSError(errno.EEXIST, 'fuse already running (%r exists)' % (socket_path,))
1633 elif config['server-shutdown']:
1634 raise OSError(errno.ENOTCONN, '--server-shutdown specified, but server not running')
1636 if not os.path.exists(config.mountpoint):
1637 raise OSError(errno.ENOENT, 'No such file or directory: "%s"' % (config.mountpoint,))
1641 # Standalone ("no-split")
1643 if config['no-split']:
1644 reopen_logfile('tfuse.%s.unsplit.log' % (root_name,))
1645 log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n')
1647 cache_timeout = float(config['cache-timeout'])
1648 tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=False)
1651 # make tfs instance accesible to print_tree() for dbg
1654 args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
1655 launch_tahoe_fuse(TahoeFuseLocal, tfs, args)
1660 elif config['server']:
1661 reopen_logfile('tfuse.%s.server.log' % (root_name,))
1662 log('\n'+(24*'_')+'init (server)'+(24*'_')+'\n')
1668 cache_timeout = float(config['cache-timeout'])
1669 tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=True)
1672 # make tfs instance accesible to print_tree() for dbg
1675 log('launching tfs server')
1676 tfuse = TahoeFuseBase(tfs)
1677 tfs_server = TFSServer(socket_path, tfuse)
1679 log('tfs server ran, exiting')
1681 log('exception: ' + traceback.format_exc())
1687 reopen_logfile('tfuse.%s.client.log' % (root_name,))
1688 log('\n'+(24*'_')+'init (client)'+(24*'_')+'\n')
1690 server_args = [sys.executable, sys.argv[0], '--server'] + argv
1691 if 'Allmydata.app/Contents/MacOS' in sys.executable:
1692 # in this case blackmatch is the 'fuse' subcommand of the 'tahoe' executable
1693 # otherwise we assume blackmatch is being run from source
1694 server_args.insert(2, 'fuse')
1695 #print 'launching server:', server_args
1696 server = subprocess.Popen(server_args)
1697 waiting_since = time.time()
1699 while not os.path.exists(socket_path):
1700 log('waiting for appearance of %r' % (socket_path,))
1702 if time.time() - waiting_since > wait_at_most:
1703 log('%r did not appear within %ss' % (socket_path, wait_at_most))
1704 raise IOError(2, 'no socket %s' % (socket_path,))
1705 #print 'launched server'
1706 trpc = TRPC(socket_path)
1709 args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
1710 launch_tahoe_fuse(TahoeFuseShim, trpc, args)
1713 if __name__ == '__main__':
1714 sys.exit(main(sys.argv[1:]))