]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - contrib/fuse/impl_c/blackmatch.py
Prevent mutable objects from being retrieved from an immutable directory, and associa...
[tahoe-lafs/tahoe-lafs.git] / contrib / fuse / impl_c / blackmatch.py
1 #!/usr/bin/env python
2
3 #-----------------------------------------------------------------------------------------------
4 from allmydata.uri import CHKFileURI, DirectoryURI, LiteralFileURI, is_literal_file_uri
5 from allmydata.scripts.common_http import do_http as do_http_req
6 from allmydata.util.hashutil import tagged_hash
7 from allmydata.util.assertutil import precondition
8 from allmydata.util import base32, fileutil, observer
9 from allmydata.scripts.common import get_aliases
10
11 from twisted.python import usage
12 from twisted.python.failure import Failure
13 from twisted.internet.protocol import Factory, Protocol
14 from twisted.internet import reactor, defer, task
15 from twisted.web import client
16
17 import base64
18 import errno
19 import heapq
20 import sha
21 import socket
22 import stat
23 import subprocess
24 import sys
25 import os
26 import weakref
27 #import pprint
28
29 # one needs either python-fuse to have been installed in sys.path, or
30 # suitable affordances to be made in the build or runtime environment
31 import fuse
32
33 import time
34 import traceback
35 import simplejson
36 import urllib
37
38 VERSIONSTR="0.7"
39
40 USAGE = 'usage: tahoe fuse [dir_cap_name] [fuse_options] mountpoint'
41 DEFAULT_DIRECTORY_VALIDITY=26
42
43 if not hasattr(fuse, '__version__'):
44     raise RuntimeError, \
45         "your fuse-py doesn't know of fuse.__version__, probably it's too old."
46
47 fuse.fuse_python_api = (0, 2)
48 fuse.feature_assert('stateful_files', 'has_init')
49
50 class TahoeFuseOptions(usage.Options):
51     optParameters = [
52         ["node-directory", None, "~/.tahoe",
53          "Look here to find out which Tahoe node should be used for all "
54          "operations. The directory should either contain a full Tahoe node, "
55          "or a file named node.url which points to some other Tahoe node. "
56          "It should also contain a file named private/aliases which contains "
57          "the mapping from alias name to root dirnode URI."
58          ],
59         ["node-url", None, None,
60          "URL of the tahoe node to use, a URL like \"http://127.0.0.1:3456\". "
61          "This overrides the URL found in the --node-directory ."],
62         ["alias", None, None,
63          "Which alias should be mounted."],
64         ["root-uri", None, None,
65          "Which root directory uri should be mounted."],
66         ["cache-timeout", None, 20,
67          "Time, in seconds, to cache directory data."],
68         ]
69     optFlags = [
70         ['no-split', None,
71          'run stand-alone; no splitting into client and server'],
72         ['server', None,
73          'server mode (should not be used by end users)'],
74         ['server-shutdown', None,
75          'shutdown server (should not be used by end users)'],
76          ]
77
78     def __init__(self):
79         usage.Options.__init__(self)
80         self.fuse_options = []
81         self.mountpoint = None
82
83     def opt_option(self, fuse_option):
84         """
85         Pass mount options directly to fuse.  See below.
86         """
87         self.fuse_options.append(fuse_option)
88         
89     opt_o = opt_option
90
91     def parseArgs(self, mountpoint=''):
92         self.mountpoint = mountpoint
93
94     def getSynopsis(self):
95         return "%s [options] mountpoint" % (os.path.basename(sys.argv[0]),)
96
97 logfile = file('tfuse.log', 'ab')
98
99 def reopen_logfile(fname):
100     global logfile
101     log('switching to %s' % (fname,))
102     logfile.close()
103     logfile = file(fname, 'ab')
104
105 def log(msg):
106     logfile.write("%s: %s\n" % (time.asctime(), msg))
107     #time.sleep(0.1)
108     logfile.flush()
109
110 fuse.flog = log
111
112 def unicode_to_utf8_or_str(u):
113     if isinstance(u, unicode):
114         return u.encode('utf-8')
115     else:
116         precondition(isinstance(u, str), repr(u))
117         return u
118
119 def do_http(method, url, body=''):
120     resp = do_http_req(method, url, body)
121     log('do_http(%s, %s) -> %s, %s' % (method, url, resp.status, resp.reason))
122     if resp.status not in (200, 201):
123         raise RuntimeError('http response (%s, %s)' % (resp.status, resp.reason))
124     else:
125         return resp.read()
126
127 def flag2mode(flags):
128     log('flag2mode(%r)' % (flags,))
129     #md = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
130     md = {os.O_RDONLY: 'rb', os.O_WRONLY: 'wb', os.O_RDWR: 'w+b'}
131     m = md[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
132
133     if flags & os.O_APPEND:
134         m = m.replace('w', 'a', 1)
135
136     return m
137
138 class TFSIOError(IOError):
139     pass
140
141 class ENOENT(TFSIOError):
142     def __init__(self, msg):
143         TFSIOError.__init__(self, errno.ENOENT, msg)
144
145 class EINVAL(TFSIOError):
146     def __init__(self, msg):
147         TFSIOError.__init__(self, errno.EINVAL, msg)
148
149 class EACCESS(TFSIOError):
150     def __init__(self, msg):
151         TFSIOError.__init__(self, errno.EACCESS, msg)
152
153 class EEXIST(TFSIOError):
154     def __init__(self, msg):
155         TFSIOError.__init__(self, errno.EEXIST, msg)
156
157 class EIO(TFSIOError):
158     def __init__(self, msg):
159         TFSIOError.__init__(self, errno.EIO, msg)
160
161 def logargsretexc(meth):
162     def inner_logargsretexc(self, *args, **kwargs):
163         log("%s(%r, %r)" % (meth, args, kwargs))
164         try:
165             ret = meth(self, *args, **kwargs)
166         except:
167             log('exception:\n%s' % (traceback.format_exc(),))
168             raise
169         log("ret: %r" % (ret, ))
170         return ret
171     inner_logargsretexc.__name__ = '<logwrap(%s)>' % (meth,)
172     return inner_logargsretexc
173
174 def logexc(meth):
175     def inner_logexc(self, *args, **kwargs):
176         try:
177             ret = meth(self, *args, **kwargs)
178         except TFSIOError, tie:
179             log('error: %s' % (tie,))
180             raise
181         except:
182             log('exception:\n%s' % (traceback.format_exc(),))
183             raise
184         return ret
185     inner_logexc.__name__ = '<logwrap(%s)>' % (meth,)
186     return inner_logexc
187
188 def log_exc():
189     log('exception:\n%s' % (traceback.format_exc(),))
190
191 def repr_mode(mode=None):
192     if mode is None:
193         return 'none'
194     fields = ['S_ENFMT', 'S_IFBLK', 'S_IFCHR', 'S_IFDIR', 'S_IFIFO', 'S_IFLNK', 'S_IFREG', 'S_IFSOCK', 'S_IRGRP', 'S_IROTH', 'S_IRUSR', 'S_IRWXG', 'S_IRWXO', 'S_IRWXU', 'S_ISGID', 'S_ISUID', 'S_ISVTX', 'S_IWGRP', 'S_IWOTH', 'S_IWUSR', 'S_IXGRP', 'S_IXOTH', 'S_IXUSR']
195     ret = []
196     for field in fields:
197         fval = getattr(stat, field)
198         if (mode & fval) == fval:
199             ret.append(field)
200     return '|'.join(ret)
201
202 def repr_flags(flags=None):
203     if flags is None:
204         return 'none'
205     fields = [ 'O_APPEND', 'O_CREAT', 'O_DIRECT', 'O_DIRECTORY', 'O_EXCL', 'O_EXLOCK',
206                'O_LARGEFILE', 'O_NDELAY', 'O_NOCTTY', 'O_NOFOLLOW', 'O_NONBLOCK', 'O_RDWR',
207                'O_SHLOCK', 'O_SYNC', 'O_TRUNC', 'O_WRONLY', ]
208     ret = []
209     for field in fields:
210         fval = getattr(os, field, None)
211         if fval is not None and (flags & fval) == fval:
212             ret.append(field)
213     if not ret:
214         ret = ['O_RDONLY']
215     return '|'.join(ret)
216
217 class DownloaderWithReadQueue(object):
218     def __init__(self):
219         self.read_heap = []
220         self.dest_file_name = None
221         self.running = False
222         self.done_observer = observer.OneShotObserverList()
223
224     def __repr__(self):
225         name = self.dest_file_name is None and '<none>' or os.path.basename(self.dest_file_name)
226         return "<DWRQ(%s)> q(%s)" % (name, len(self.read_heap or []))
227
228     def log(self, msg):
229         log("%r: %s" % (self, msg))
230
231     @logexc
232     def start(self, url, dest_file_name, target_size, interval=0.5):
233         self.log('start(%s, %s, %s)' % (url, dest_file_name, target_size, ))
234         self.dest_file_name = dest_file_name
235         file(self.dest_file_name, 'wb').close() # touch
236         self.target_size = target_size
237         self.log('start()')
238         self.loop = task.LoopingCall(self._check_file_size)
239         self.loop.start(interval)
240         self.running = True
241         d = client.downloadPage(url, self.dest_file_name)
242         d.addCallbacks(self.done, self.fail)
243         return d
244
245     def when_done(self):
246         return self.done_observer.when_fired()
247
248     def get_size(self):
249         if os.path.exists(self.dest_file_name):
250             return os.path.getsize(self.dest_file_name)
251         else:
252             return 0
253
254     @logexc
255     def _read(self, posn, size):
256         #self.log('_read(%s, %s)' % (posn, size))
257         f = file(self.dest_file_name, 'rb')
258         f.seek(posn)
259         data = f.read(size)
260         f.close()
261         return data
262
263     @logexc
264     def read(self, posn, size):
265         self.log('read(%s, %s)' % (posn, size))
266         if self.read_heap is None:
267             raise ValueError('read() called when already shut down')
268         if posn+size > self.target_size:
269             size -= self.target_size - posn
270         fsize = self.get_size()
271         if posn+size < fsize:
272             return defer.succeed(self._read(posn, size))
273         else:
274             d = defer.Deferred()
275             dread = (posn+size, posn, d)
276             heapq.heappush(self.read_heap, dread)
277         return d
278
279     @logexc
280     def _check_file_size(self):
281         #self.log('_check_file_size()')
282         if self.read_heap:
283             try:
284                 size = self.get_size()
285                 while self.read_heap and self.read_heap[0][0] <= size:
286                     end, start, d = heapq.heappop(self.read_heap)
287                     data = self._read(start, end-start)
288                     d.callback(data)
289             except Exception, e:
290                 log_exc()
291                 failure = Failure()
292
293     @logexc
294     def fail(self, failure):
295         self.log('fail(%s)' % (failure,))
296         self.running = False
297         if self.loop.running:
298             self.loop.stop()
299         # fail any reads still pending
300         for end, start, d in self.read_heap:
301             reactor.callLater(0, d.errback, failure)
302         self.read_heap = None
303         self.done_observer.fire_if_not_fired(failure)
304         return failure
305
306     @logexc
307     def done(self, result):
308         self.log('done()')
309         self.running = False
310         if self.loop.running:
311             self.loop.stop()
312         precondition(self.get_size() == self.target_size, self.get_size(), self.target_size)
313         self._check_file_size() # process anything left pending in heap
314         precondition(not self.read_heap, self.read_heap, self.target_size, self.get_size())
315         self.read_heap = None
316         self.done_observer.fire_if_not_fired(self)
317         return result
318
319
320 class TahoeFuseFile(object):
321
322     #def __init__(self, path, flags, *mode):
323     def __init__(self, tfs, path, flags, *mode):
324         log("TFF: __init__(%r, %r:%s, %r:%s)" % (path, flags, repr_flags(flags), mode, repr_mode(*mode)))
325         self.tfs = tfs
326         self.downloader = None
327
328         self._path = path # for tahoe put
329         try:
330             self.parent, self.name, self.fnode = self.tfs.get_parent_name_and_child(path)
331             m = flag2mode(flags)
332             log('TFF: flags2(mode) -> %s' % (m,))
333             if m[0] in 'wa':
334                 # write
335                 self.fname = self.tfs.cache.tmp_file(os.urandom(20))
336                 if self.fnode is None:
337                     log('TFF: [%s] open() for write: no file node, creating new File %s' % (self.name, self.fname, ))
338                     self.fnode = File(0, LiteralFileURI.BASE_STRING)
339                     self.fnode.tmp_fname = self.fname # XXX kill this
340                     self.parent.add_child(self.name, self.fnode, {})
341                 elif hasattr(self.fnode, 'tmp_fname'):
342                     self.fname = self.fnode.tmp_fname
343                     log('TFF: [%s] open() for write: existing file node lists %s' % (self.name, self.fname, ))
344                 else:
345                     log('TFF: [%s] open() for write: existing file node lists no tmp_file, using new %s' % (self.name, self.fname, ))
346                 if mode != (0600,):
347                     log('TFF: [%s] changing mode %s(%s) to 0600' % (self.name, repr_mode(*mode), mode))
348                     mode = (0600,)
349                 log('TFF: [%s] opening(%s) with flags %s(%s), mode %s(%s)' % (self.name, self.fname, repr_flags(flags|os.O_CREAT), flags|os.O_CREAT, repr_mode(*mode), mode))
350                 #self.file = os.fdopen(os.open(self.fname, flags|os.O_CREAT, *mode), m)
351                 self.file = os.fdopen(os.open(self.fname, flags|os.O_CREAT, *mode), m)
352                 self.fd = self.file.fileno()
353                 log('TFF: opened(%s) for write' % self.fname)
354                 self.open_for_write = True
355             else:
356                 # read
357                 assert self.fnode is not None
358                 uri = self.fnode.get_uri()
359
360                 # XXX make this go away
361                 if hasattr(self.fnode, 'tmp_fname'):
362                     self.fname = self.fnode.tmp_fname
363                     log('TFF: reopening(%s) for reading' % self.fname)
364                 else:
365                     if is_literal_file_uri(uri) or not self.tfs.async:
366                         log('TFF: synchronously fetching file from cache for reading')
367                         self.fname = self.tfs.cache.get_file(uri)
368                     else:
369                         log('TFF: asynchronously fetching file from cache for reading')
370                         self.fname, self.downloader = self.tfs.cache.async_get_file(uri)
371                         # downloader is None if the cache already contains the file
372                         if self.downloader is not None:
373                             d = self.downloader.when_done()
374                             def download_complete(junk):
375                                 # once the download is complete, revert to non-async behaviour
376                                 self.downloader = None
377                             d.addCallback(download_complete)
378
379                 self.file = os.fdopen(os.open(self.fname, flags, *mode), m)
380                 self.fd = self.file.fileno()
381                 self.open_for_write = False
382                 log('TFF: opened(%s) for read' % self.fname)
383         except:
384             log_exc()
385             raise
386
387     def log(self, msg):
388         log("<TFF(%s:%s)> %s" % (os.path.basename(self.fname), self.name, msg))
389
390     @logexc
391     def read(self, size, offset):
392         self.log('read(%r, %r)' % (size, offset, ))
393         if self.downloader:
394             # then we're busy doing an async download
395             # (and hence implicitly, we're in an environment that supports twisted)
396             #self.log('passing read() to %s' % (self.downloader, ))
397             d = self.downloader.read(offset, size)
398             def thunk(failure):
399                 raise EIO(str(failure))
400             d.addErrback(thunk)
401             return d
402         else:
403             self.log('servicing read() from %s' % (self.file, ))
404             self.file.seek(offset)
405             return self.file.read(size)
406
407     @logexc
408     def write(self, buf, offset):
409         self.log("write(-%s-, %r)" % (len(buf), offset))
410         if not self.open_for_write:
411             return -errno.EACCES
412         self.file.seek(offset)
413         self.file.write(buf)
414         return len(buf)
415
416     @logexc
417     def release(self, flags):
418         self.log("release(%r)" % (flags,))
419         self.file.close()
420         if self.open_for_write:
421             size = os.path.getsize(self.fname)
422             self.fnode.size = size
423             file_cap = self.tfs.upload(self.fname)
424             self.fnode.ro_uri = file_cap
425             # XXX [ ] TODO: set metadata
426             # write new uri into parent dir entry
427             self.parent.add_child(self.name, self.fnode, {})
428             self.log("uploaded: %s" % (file_cap,))
429
430         # dbg
431         print_tree()
432
433     def _fflush(self):
434         if 'w' in self.file.mode or 'a' in self.file.mode:
435             self.file.flush()
436
437     @logexc
438     def fsync(self, isfsyncfile):
439         self.log("fsync(%r)" % (isfsyncfile,))
440         self._fflush()
441         if isfsyncfile and hasattr(os, 'fdatasync'):
442             os.fdatasync(self.fd)
443         else:
444             os.fsync(self.fd)
445
446     @logexc
447     def flush(self):
448         self.log("flush()")
449         self._fflush()
450         # cf. xmp_flush() in fusexmp_fh.c
451         os.close(os.dup(self.fd))
452
453     @logexc
454     def fgetattr(self):
455         self.log("fgetattr()")
456         s = os.fstat(self.fd)
457         d = stat_to_dict(s)
458         if self.downloader:
459             size = self.downloader.target_size
460             self.log("fgetattr() during async download, cache file: %s, size=%s" % (s, size))
461             d['st_size'] = size
462         self.log("fgetattr() -> %r" % (d,))
463         return d
464
465     @logexc
466     def ftruncate(self, len):
467         self.log("ftruncate(%r)" % (len,))
468         self.file.truncate(len)
469
470 class TahoeFuseBase(object):
471
472     def __init__(self, tfs):
473         log("TFB: __init__()")
474         self.tfs = tfs
475         self.files = {}
476
477     def log(self, msg):
478         log("<TFB> %s" % (msg, ))
479
480     @logexc
481     def readlink(self, path):
482         self.log("readlink(%r)" % (path,))
483         node = self.tfs.get_path(path)
484         if node:
485             raise EINVAL('Not a symlink') # nothing in tahoe is a symlink
486         else:
487             raise ENOENT('Invalid argument')
488
489     @logexc
490     def unlink(self, path):
491         self.log("unlink(%r)" % (path,))
492         self.tfs.unlink(path)
493
494     @logexc
495     def rmdir(self, path):
496         self.log("rmdir(%r)" % (path,))
497         self.tfs.unlink(path)
498
499     @logexc
500     def symlink(self, path, path1):
501         self.log("symlink(%r, %r)" % (path, path1))
502         self.tfs.link(path, path1)
503
504     @logexc
505     def rename(self, path, path1):
506         self.log("rename(%r, %r)" % (path, path1))
507         self.tfs.rename(path, path1)
508
509     @logexc
510     def link(self, path, path1):
511         self.log("link(%r, %r)" % (path, path1))
512         self.tfs.link(path, path1)
513
514     @logexc
515     def chmod(self, path, mode):
516         self.log("XX chmod(%r, %r)" % (path, mode))
517         #return -errno.EOPNOTSUPP
518
519     @logexc
520     def chown(self, path, user, group):
521         self.log("XX chown(%r, %r, %r)" % (path, user, group))
522         #return -errno.EOPNOTSUPP
523
524     @logexc
525     def truncate(self, path, len):
526         self.log("XX truncate(%r, %r)" % (path, len))
527         #return -errno.EOPNOTSUPP
528
529     @logexc
530     def utime(self, path, times):
531         self.log("XX utime(%r, %r)" % (path, times))
532         #return -errno.EOPNOTSUPP
533
534     @logexc
535     def statfs(self):
536         self.log("statfs()")
537         """
538         Should return an object with statvfs attributes (f_bsize, f_frsize...).
539         Eg., the return value of os.statvfs() is such a thing (since py 2.2).
540         If you are not reusing an existing statvfs object, start with
541         fuse.StatVFS(), and define the attributes.
542
543         To provide usable information (ie., you want sensible df(1)
544         output, you are suggested to specify the following attributes:
545
546             - f_bsize - preferred size of file blocks, in bytes
547             - f_frsize - fundamental size of file blcoks, in bytes
548                 [if you have no idea, use the same as blocksize]
549             - f_blocks - total number of blocks in the filesystem
550             - f_bfree - number of free blocks
551             - f_files - total number of file inodes
552             - f_ffree - nunber of free file inodes
553         """
554
555         block_size = 4096 # 4k
556         preferred_block_size = 131072 # 128k, c.f. seg_size
557         fs_size = 8*2**40 # 8Tb
558         fs_free = 2*2**40 # 2Tb
559
560         #s = fuse.StatVfs(f_bsize = preferred_block_size,
561         s = dict(f_bsize = preferred_block_size,
562                          f_frsize = block_size,
563                          f_blocks = fs_size / block_size,
564                          f_bfree = fs_free / block_size,
565                          f_bavail = fs_free / block_size,
566                          f_files = 2**30, # total files
567                          f_ffree = 2**20, # available files
568                          f_favail = 2**20, # available files (root)
569                          f_flag = 2, # no suid
570                          f_namemax = 255) # max name length
571         #self.log('statfs(): %r' % (s,))
572         return s
573
574     def fsinit(self):
575         self.log("fsinit()")
576
577     ##################################################################
578
579     @logexc
580     def readdir(self, path, offset):
581         self.log('readdir(%r, %r)' % (path, offset))
582         node = self.tfs.get_path(path)
583         if node is None:
584             return -errno.ENOENT
585         dirlist = ['.', '..'] + node.children.keys()
586         self.log('dirlist = %r' % (dirlist,))
587         #return [fuse.Direntry(d) for d in dirlist]
588         return dirlist
589
590     @logexc
591     def getattr(self, path):
592         self.log('getattr(%r)' % (path,))
593
594         if path == '/':
595             # we don't have any metadata for the root (no edge leading to it)
596             mode = (stat.S_IFDIR | 755)
597             mtime = self.tfs.root.mtime
598             s = TStat({}, st_mode=mode, st_nlink=1, st_mtime=mtime)
599             self.log('getattr(%r) -> %r' % (path, s))
600             #return s
601             return stat_to_dict(s)
602             
603         parent, name, child = self.tfs.get_parent_name_and_child(path)
604         if not child: # implicitly 'or not parent'
605             raise ENOENT('No such file or directory')
606         return stat_to_dict(parent.get_stat(name))
607
608     @logexc
609     def access(self, path, mode):
610         self.log("access(%r, %r)" % (path, mode))
611         node = self.tfs.get_path(path)
612         if not node:
613             return -errno.ENOENT
614         accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
615         if (mode & 0222):
616             if not node.writable():
617                 log('write access denied for %s (req:%o)' % (path, mode, ))
618                 return -errno.EACCES
619         #else:
620             #log('access granted for %s' % (path, ))
621
622     @logexc
623     def mkdir(self, path, mode):
624         self.log("mkdir(%r, %r)" % (path, mode))
625         self.tfs.mkdir(path)
626
627     ##################################################################
628     # file methods
629
630     def open(self, path, flags):
631         self.log('open(%r, %r)' % (path, flags, ))
632         if path in self.files:
633             # XXX todo [ ] should consider concurrent open files of differing modes
634             return
635         else:
636             tffobj = TahoeFuseFile(self.tfs, path, flags)
637             self.files[path] = tffobj
638
639     def create(self, path, flags, mode):
640         self.log('create(%r, %r, %r)' % (path, flags, mode))
641         if path in self.files:
642             # XXX todo [ ] should consider concurrent open files of differing modes
643             return
644         else:
645             tffobj = TahoeFuseFile(self.tfs, path, flags, mode)
646             self.files[path] = tffobj
647
648     def _get_file(self, path):
649         if not path in self.files:
650             raise ENOENT('No such file or directory: %s' % (path,))
651         return self.files[path]
652
653     ##
654
655     def read(self, path, size, offset):
656         self.log('read(%r, %r, %r)' % (path, size, offset, ))
657         return self._get_file(path).read(size, offset)
658
659     @logexc
660     def write(self, path, buf, offset):
661         self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
662         return self._get_file(path).write(buf, offset)
663
664     @logexc
665     def release(self, path, flags):
666         self.log("release(%r, %r)" % (path, flags,))
667         self._get_file(path).release(flags)
668         del self.files[path]
669
670     @logexc
671     def fsync(self, path, isfsyncfile):
672         self.log("fsync(%r, %r)" % (path, isfsyncfile,))
673         return self._get_file(path).fsync(isfsyncfile)
674
675     @logexc
676     def flush(self, path):
677         self.log("flush(%r)" % (path,))
678         return self._get_file(path).flush()
679
680     @logexc
681     def fgetattr(self, path):
682         self.log("fgetattr(%r)" % (path,))
683         return self._get_file(path).fgetattr()
684
685     @logexc
686     def ftruncate(self, path, len):
687         self.log("ftruncate(%r, %r)" % (path, len,))
688         return self._get_file(path).ftruncate(len)
689
690 class TahoeFuseLocal(TahoeFuseBase, fuse.Fuse):
691     def __init__(self, tfs, *args, **kw):
692         log("TFL: __init__(%r, %r)" % (args, kw))
693         TahoeFuseBase.__init__(self, tfs)
694         fuse.Fuse.__init__(self, *args, **kw)
695
696     def log(self, msg):
697         log("<TFL> %s" % (msg, ))
698
699     def main(self, *a, **kw):
700         self.log("main(%r, %r)" % (a, kw))
701         return fuse.Fuse.main(self, *a, **kw)
702
703     # overrides for those methods which return objects not marshalled
704     def fgetattr(self, path):
705         return TStat({}, **(TahoeFuseBase.fgetattr(self, path)))
706
707     def getattr(self, path):
708         return TStat({}, **(TahoeFuseBase.getattr(self, path)))
709
710     def statfs(self):
711         return fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
712         #self.log('statfs()')
713         #ret = fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
714         #self.log('statfs(): %r' % (ret,))
715         #return ret
716
717     @logexc
718     def readdir(self, path, offset):
719         return [ fuse.Direntry(d) for d in TahoeFuseBase.readdir(self, path, offset) ]
720
721 class TahoeFuseShim(fuse.Fuse):
722     def __init__(self, trpc, *args, **kw):
723         log("TF: __init__(%r, %r)" % (args, kw))
724         self.trpc = trpc
725         fuse.Fuse.__init__(self, *args, **kw)
726
727     def log(self, msg):
728         log("<TFs> %s" % (msg, ))
729
730     @logexc
731     def readlink(self, path):
732         self.log("readlink(%r)" % (path,))
733         return self.trpc.call('readlink', path)
734
735     @logexc
736     def unlink(self, path):
737         self.log("unlink(%r)" % (path,))
738         return self.trpc.call('unlink', path)
739
740     @logexc
741     def rmdir(self, path):
742         self.log("rmdir(%r)" % (path,))
743         return self.trpc.call('unlink', path)
744
745     @logexc
746     def symlink(self, path, path1):
747         self.log("symlink(%r, %r)" % (path, path1))
748         return self.trpc.call('link', path, path1)
749
750     @logexc
751     def rename(self, path, path1):
752         self.log("rename(%r, %r)" % (path, path1))
753         return self.trpc.call('rename', path, path1)
754
755     @logexc
756     def link(self, path, path1):
757         self.log("link(%r, %r)" % (path, path1))
758         return self.trpc.call('link', path, path1)
759
760     @logexc
761     def chmod(self, path, mode):
762         self.log("XX chmod(%r, %r)" % (path, mode))
763         return self.trpc.call('chmod', path, mode)
764
765     @logexc
766     def chown(self, path, user, group):
767         self.log("XX chown(%r, %r, %r)" % (path, user, group))
768         return self.trpc.call('chown', path, user, group)
769
770     @logexc
771     def truncate(self, path, len):
772         self.log("XX truncate(%r, %r)" % (path, len))
773         return self.trpc.call('truncate', path, len)
774
775     @logexc
776     def utime(self, path, times):
777         self.log("XX utime(%r, %r)" % (path, times))
778         return self.trpc.call('utime', path, times)
779
780     @logexc
781     def statfs(self):
782         self.log("statfs()")
783         response = self.trpc.call('statfs')
784         #self.log("statfs(): %r" % (response,))
785         kwargs = dict([ (str(k),v) for k,v in response.items() ])
786         return fuse.StatVfs(**kwargs)
787
788     def fsinit(self):
789         self.log("fsinit()")
790
791     def main(self, *a, **kw):
792         self.log("main(%r, %r)" % (a, kw))
793
794         return fuse.Fuse.main(self, *a, **kw)
795
796     ##################################################################
797
798     @logexc
799     def readdir(self, path, offset):
800         self.log('readdir(%r, %r)' % (path, offset))
801         return [ fuse.Direntry(d) for d in self.trpc.call('readdir', path, offset) ]
802
803     @logexc
804     def getattr(self, path):
805         self.log('getattr(%r)' % (path,))
806         response = self.trpc.call('getattr', path)
807         kwargs = dict([ (str(k),v) for k,v in response.items() ])
808         s = TStat({}, **kwargs)
809         self.log('getattr(%r) -> %r' % (path, s))
810         return s
811
812     @logexc
813     def access(self, path, mode):
814         self.log("access(%r, %r)" % (path, mode))
815         return self.trpc.call('access', path, mode)
816
817     @logexc
818     def mkdir(self, path, mode):
819         self.log("mkdir(%r, %r)" % (path, mode))
820         return self.trpc.call('mkdir', path, mode)
821
822     ##################################################################
823     # file methods
824
825     def open(self, path, flags):
826         self.log('open(%r, %r)' % (path, flags, ))
827         return self.trpc.call('open', path, flags)
828
829     def create(self, path, flags, mode):
830         self.log('create(%r, %r, %r)' % (path, flags, mode))
831         return self.trpc.call('create', path, flags, mode)
832
833     ##
834
835     def read(self, path, size, offset):
836         self.log('read(%r, %r, %r)' % (path, size, offset, ))
837         return self.trpc.call('read', path, size, offset)
838
839     @logexc
840     def write(self, path, buf, offset):
841         self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
842         return self.trpc.call('write', path, buf, offset)
843
844     @logexc
845     def release(self, path, flags):
846         self.log("release(%r, %r)" % (path, flags,))
847         return self.trpc.call('release', path, flags)
848
849     @logexc
850     def fsync(self, path, isfsyncfile):
851         self.log("fsync(%r, %r)" % (path, isfsyncfile,))
852         return self.trpc.call('fsync', path, isfsyncfile)
853
854     @logexc
855     def flush(self, path):
856         self.log("flush(%r)" % (path,))
857         return self.trpc.call('flush', path)
858
859     @logexc
860     def fgetattr(self, path):
861         self.log("fgetattr(%r)" % (path,))
862         #return self.trpc.call('fgetattr', path)
863         response = self.trpc.call('fgetattr', path)
864         kwargs = dict([ (str(k),v) for k,v in response.items() ])
865         s = TStat({}, **kwargs)
866         self.log('getattr(%r) -> %r' % (path, s))
867         return s
868
869     @logexc
870     def ftruncate(self, path, len):
871         self.log("ftruncate(%r, %r)" % (path, len,))
872         return self.trpc.call('ftruncate', path, len)
873
874
875 def launch_tahoe_fuse(tf_class, tobj, argv):
876     sys.argv = ['tahoe fuse'] + list(argv)
877     log('setting sys.argv=%r' % (sys.argv,))
878     config = TahoeFuseOptions()
879     version = "%prog " +VERSIONSTR+", fuse "+ fuse.__version__
880     server = tf_class(tobj, version=version, usage=config.getSynopsis(), dash_s_do='setsingle')
881     server.parse(errex=1)
882     server.main()
883
884 def getnodeurl(nodedir):
885     f = file(os.path.expanduser(os.path.join(nodedir, "node.url")), 'rb')
886     nu = f.read().strip()
887     f.close()
888     if nu[-1] != "/":
889         nu += "/"
890     return nu
891
892 def fingerprint(uri):
893     if uri is None:
894         return None
895     return base64.b32encode(sha.new(uri).digest()).lower()[:6]
896
897 stat_fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
898                 'st_atime', 'st_mtime', 'st_ctime', ]
899 def stat_to_dict(statobj, fields=None):
900     if fields is None:
901         fields = stat_fields
902     d = {}
903     for f in fields:
904         d[f] = getattr(statobj, f, None)
905     return d
906
907 class TStat(fuse.Stat):
908     # in fuse 0.2, these are set by fuse.Stat.__init__
909     # in fuse 0.2-pre3 (hardy) they are not. badness unsues if they're missing
910     st_mode  = None
911     st_ino   = 0
912     st_dev   = 0
913     st_nlink = None
914     st_uid   = 0
915     st_gid   = 0
916     st_size  = 0
917     st_atime = 0
918     st_mtime = 0
919     st_ctime = 0
920
921     fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
922                'st_atime', 'st_mtime', 'st_ctime', ]
923     def __init__(self, metadata, **kwargs):
924         # first load any stat fields present in 'metadata'
925         for st in [ 'mtime', 'ctime' ]:
926             if st in metadata:
927                 setattr(self, "st_%s" % st, metadata[st])
928         for st in self.fields:
929             if st in metadata:
930                 setattr(self, st, metadata[st])
931
932         # then set any values passed in as kwargs
933         fuse.Stat.__init__(self, **kwargs)
934
935     def __repr__(self):
936         return "<Stat%r>" % (stat_to_dict(self),)
937
938 class Directory(object):
939     def __init__(self, tfs, ro_uri, rw_uri):
940         self.tfs = tfs
941         self.ro_uri = ro_uri
942         self.rw_uri = rw_uri
943         assert (rw_uri or ro_uri)
944         self.children = {}
945         self.last_load = None
946         self.last_data = None
947         self.mtime = 0
948
949     def __repr__(self):
950         return "<Directory %s>" % (fingerprint(self.get_uri()),)
951
952     def maybe_refresh(self, name=None):
953         """
954         if the previously cached data was retrieved within the cache
955         validity period, does nothing. otherwise refetches the data
956         for this directory and reloads itself
957         """
958         now = time.time()
959         if self.last_load is None or (now - self.last_load) > self.tfs.cache_validity:
960             self.load(name)
961
962     def load(self, name=None):
963         now = time.time()
964         log('%s.loading(%s)' % (self, name))
965         url = self.tfs.compose_url("uri/%s?t=json", self.get_uri())
966         data = urllib.urlopen(url).read()
967         h = tagged_hash('cache_hash', data)
968         if h == self.last_data:
969             self.last_load = now
970             log('%s.load() : no change h(data)=%s' % (self, base32.b2a(h), ))
971             return
972         try:
973             parsed = simplejson.loads(data)
974         except ValueError:
975             log('%s.load(): unable to parse json data for dir:\n%r' % (self, data))
976             return
977         nodetype, d = parsed
978         assert nodetype == 'dirnode'
979         self.children.clear()
980         for cname,details in d['children'].items():
981             cname = unicode_to_utf8_or_str(cname)
982             ctype, cattrs = details
983             metadata = cattrs.get('metadata', {})
984             if ctype == 'dirnode':
985                 cobj = self.tfs.dir_for(cname, cattrs.get('ro_uri'), cattrs.get('rw_uri'))
986             else:
987                 assert ctype == "filenode"
988                 cobj = File(cattrs.get('size'), cattrs.get('ro_uri'))
989             self.children[cname] = cobj, metadata
990         self.last_load = now
991         self.last_data = h
992         self.mtime = now
993         log('%s.load() loaded: \n%s' % (self, self.pprint(),))
994
995     def get_children(self):
996         return self.children.keys()
997
998     def get_child(self, name):
999         return self.children[name][0]
1000
1001     def add_child(self, name, child, metadata):
1002         log('%s.add_child(%r, %r, %r)' % (self, name, child, metadata, ))
1003         self.children[name] = child, metadata
1004         url = self.tfs.compose_url("uri/%s/%s?t=uri", self.get_uri(), name)
1005         child_cap = do_http('PUT', url, child.get_uri())
1006         # XXX [ ] TODO: push metadata to tahoe node
1007         assert child_cap == child.get_uri()
1008         self.mtime = time.time()
1009         log('added child %r with %r to %r' % (name, child_cap, self))
1010
1011     def remove_child(self, name):
1012         log('%s.remove_child(%r)' % (self, name, ))
1013         del self.children[name]
1014         url = self.tfs.compose_url("uri/%s/%s", self.get_uri(), name)
1015         resp = do_http('DELETE', url)
1016         self.mtime = time.time()
1017         log('child (%s) removal yielded %r' % (name, resp,))
1018
1019     def get_uri(self):
1020         return self.rw_uri or self.ro_uri
1021
1022     def writable(self):
1023         return self.rw_uri and self.rw_uri != self.ro_uri
1024
1025     def pprint(self, prefix='', printed=None, suffix=''):
1026         ret = []
1027         if printed is None:
1028             printed = set()
1029         writable = self.writable() and '+' or ' '
1030         if self in printed:
1031             ret.append("         %s/%s ... <%s> : %s" % (prefix, writable, fingerprint(self.get_uri()), suffix, ))
1032         else:
1033             ret.append("[%s] %s/%s : %s" % (fingerprint(self.get_uri()), prefix, writable, suffix, ))
1034             printed.add(self)
1035             for name,(child,metadata) in sorted(self.children.items()):
1036                 ret.append(child.pprint(' ' * (len(prefix)+1)+name, printed, repr(metadata)))
1037         return '\n'.join(ret)
1038
1039     def get_metadata(self, name):
1040         return self.children[name][1]
1041
1042     def get_stat(self, name):
1043         child,metadata = self.children[name]
1044         log("%s.get_stat(%s) md: %r" % (self, name, metadata))
1045
1046         if isinstance(child, Directory):
1047             child.maybe_refresh(name)
1048             mode = metadata.get('st_mode') or (stat.S_IFDIR | 0755)
1049             s = TStat(metadata, st_mode=mode, st_nlink=1, st_mtime=child.mtime)
1050         else:
1051             if hasattr(child, 'tmp_fname'):
1052                 s = os.stat(child.tmp_fname)
1053                 log("%s.get_stat(%s) returning local stat of tmp file" % (self, name, ))
1054             else:
1055                 s = TStat(metadata,
1056                           st_nlink = 1,
1057                           st_size = child.size,
1058                           st_mode = metadata.get('st_mode') or (stat.S_IFREG | 0444),
1059                           st_mtime = metadata.get('mtime') or self.mtime,
1060                           )
1061             return s
1062
1063         log("%s.get_stat(%s)->%s" % (self, name, s))
1064         return s
1065
1066 class File(object):
1067     def __init__(self, size, ro_uri):
1068         self.size = size
1069         if ro_uri:
1070             ro_uri = str(ro_uri)
1071         self.ro_uri = ro_uri
1072
1073     def __repr__(self):
1074         return "<File %s>" % (fingerprint(self.ro_uri) or [self.tmp_fname],)
1075
1076     def pprint(self, prefix='', printed=None, suffix=''):
1077         return "         %s (%s) : %s" % (prefix, self.size, suffix, )
1078
1079     def get_uri(self):
1080         return self.ro_uri
1081
1082     def writable(self):
1083         return True
1084
1085 class TFS(object):
1086     def __init__(self, nodedir, nodeurl, root_uri, 
1087                        cache_validity_period=DEFAULT_DIRECTORY_VALIDITY, async=False):
1088         self.cache_validity = cache_validity_period
1089         self.nodeurl = nodeurl
1090         self.root_uri = root_uri
1091         self.async = async
1092         self.dirs = {}
1093
1094         cachedir = os.path.expanduser(os.path.join(nodedir, '_cache'))
1095         self.cache = FileCache(nodeurl, cachedir)
1096         ro_uri = DirectoryURI.init_from_string(self.root_uri).get_readonly()
1097         self.root = Directory(self, ro_uri, self.root_uri)
1098         self.root.maybe_refresh('<root>')
1099
1100     def log(self, msg):
1101         log("<TFS> %s" % (msg, ))
1102
1103     def pprint(self):
1104         return self.root.pprint()
1105
1106     def compose_url(self, fmt, *args):
1107         return self.nodeurl + (fmt % tuple(map(urllib.quote, args)))
1108
1109     def get_parent_name_and_child(self, path):
1110         """
1111         find the parent dir node, name of child relative to that parent, and
1112         child node within the TFS object space.
1113         @returns: (parent, name, child) if the child is found
1114                   (parent, name, None) if the child is missing from the parent
1115                   (None, name, None) if the parent is not found
1116         """
1117         if path == '/':
1118             return 
1119         dirname, name = os.path.split(path)
1120         parent = self.get_path(dirname)
1121         if parent:
1122             try:
1123                 child = parent.get_child(name)
1124                 return parent, name, child
1125             except KeyError:
1126                 return parent, name, None
1127         else:
1128             return None, name, None
1129
1130     def get_path(self, path):
1131         comps = path.strip('/').split('/')
1132         if comps == ['']:
1133             comps = []
1134         cursor = self.root
1135         c_name = '<root>'
1136         for comp in comps:
1137             if not isinstance(cursor, Directory):
1138                 self.log('path "%s" is not a dir' % (path,))
1139                 return None
1140             cursor.maybe_refresh(c_name)
1141             try:
1142                 cursor = cursor.get_child(comp)
1143                 c_name = comp
1144             except KeyError:
1145                 self.log('path "%s" not found' % (path,))
1146                 return None
1147         if isinstance(cursor, Directory):
1148             cursor.maybe_refresh(c_name)
1149         return cursor
1150
1151     def dir_for(self, name, ro_uri, rw_uri):
1152         #self.log('dir_for(%s) [%s/%s]' % (name, fingerprint(ro_uri), fingerprint(rw_uri)))
1153         if ro_uri:
1154             ro_uri = str(ro_uri)
1155         if rw_uri:
1156             rw_uri = str(rw_uri)
1157         uri = rw_uri or ro_uri
1158         assert uri
1159         dirobj = self.dirs.get(uri)
1160         if not dirobj:
1161             self.log('dir_for(%s) creating new Directory' % (name, ))
1162             dirobj = Directory(self, ro_uri, rw_uri)
1163             self.dirs[uri] = dirobj
1164         return dirobj
1165
1166     def upload(self, fname):
1167         self.log('upload(%r)' % (fname,))
1168         fh = file(fname, 'rb')
1169         url = self.compose_url("uri")
1170         file_cap = do_http('PUT', url, fh)
1171         self.log('uploaded to: %r' % (file_cap,))
1172         return file_cap
1173
1174     def mkdir(self, path):
1175         self.log('mkdir(%r)' % (path,))
1176         parent, name, child = self.get_parent_name_and_child(path)
1177
1178         if child:
1179             raise EEXIST('File exists: %s' % (name,))
1180         if not parent:
1181             raise ENOENT('No such file or directory: %s' % (path,))
1182
1183         url = self.compose_url("uri?t=mkdir")
1184         new_dir_cap = do_http('PUT', url)
1185
1186         ro_uri = DirectoryURI.init_from_string(new_dir_cap).get_readonly()
1187         child = Directory(self, ro_uri, new_dir_cap)
1188         parent.add_child(name, child, {})
1189
1190     def rename(self, path, path1):
1191         self.log('rename(%s, %s)' % (path, path1))
1192         src_parent, src_name, src_child = self.get_parent_name_and_child(path)
1193         dst_parent, dst_name, dst_child = self.get_parent_name_and_child(path1)
1194
1195         if not src_child or not dst_parent:
1196             raise ENOENT('No such file or directory')
1197
1198         dst_parent.add_child(dst_name, src_child, {})
1199         src_parent.remove_child(src_name)
1200
1201     def unlink(self, path):
1202         parent, name, child = self.get_parent_name_and_child(path)
1203
1204         if child is None: # parent or child is missing
1205             raise ENOENT('No such file or directory')
1206         if not parent.writable():
1207             raise EACCESS('Permission denied')
1208
1209         parent.remove_child(name)
1210
1211     def link(self, path, path1):
1212         src = self.get_path(path)
1213         dst_parent, dst_name, dst_child = self.get_parent_name_and_child(path1)
1214
1215         if not src:
1216             raise ENOENT('No such file or directory')
1217         if dst_parent is None:
1218             raise ENOENT('No such file or directory')
1219         if not dst_parent.writable():
1220             raise EACCESS('Permission denied')
1221
1222         dst_parent.add_child(dst_name, src, {})
1223
1224 class FileCache(object):
1225     def __init__(self, nodeurl, cachedir):
1226         self.nodeurl = nodeurl
1227         self.cachedir = cachedir
1228         if not os.path.exists(self.cachedir):
1229             os.makedirs(self.cachedir)
1230         self.tmpdir = os.path.join(self.cachedir, 'tmp')
1231         if not os.path.exists(self.tmpdir):
1232             os.makedirs(self.tmpdir)
1233         self.downloaders = weakref.WeakValueDictionary()
1234
1235     def log(self, msg):
1236         log("<FC> %s" % (msg, ))
1237
1238     def get_file(self, uri):
1239         self.log('get_file(%s)' % (uri,))
1240         if is_literal_file_uri(uri):
1241             return self.get_literal(uri)
1242         else:
1243             return self.get_chk(uri, async=False)
1244
1245     def async_get_file(self, uri):
1246         self.log('get_file(%s)' % (uri,))
1247         return self.get_chk(uri, async=True)
1248
1249     def get_literal(self, uri):
1250         h = sha.new(uri).digest()
1251         u = LiteralFileURI.init_from_string(uri)
1252         fname = os.path.join(self.cachedir, '__'+base64.b32encode(h).lower())
1253         size = len(u.data)
1254         self.log('writing literal file %s (%s)' % (fname, size, ))
1255         fh = open(fname, 'wb')
1256         fh.write(u.data)
1257         fh.close()
1258         return fname
1259
1260     def get_chk(self, uri, async=False):
1261         u = CHKFileURI.init_from_string(str(uri))
1262         storage_index = u.storage_index
1263         size = u.size
1264         fname = os.path.join(self.cachedir, base64.b32encode(storage_index).lower())
1265         if os.path.exists(fname):
1266             fsize = os.path.getsize(fname)
1267             if fsize == size:
1268                 if async:
1269                     return fname, None
1270                 else:
1271                     return fname
1272             else:
1273                 self.log('warning file "%s" is too short %s < %s' % (fname, fsize, size))
1274         self.log('downloading file %s (%s)' % (fname, size, ))
1275         url = "%suri/%s" % (self.nodeurl, uri)
1276         if async:
1277             if fname in self.downloaders and self.downloaders[fname].running:
1278                 downloader = self.downloaders[fname]
1279             else:
1280                 downloader = DownloaderWithReadQueue()
1281                 self.downloaders[fname] = downloader
1282                 d = downloader.start(url, fname, target_size=u.size)
1283                 def clear_downloader(result, fname):
1284                     self.log('clearing %s from downloaders: %r' % (fname, result))
1285                     self.downloaders.pop(fname, None)
1286                 d.addBoth(clear_downloader, fname)
1287             return fname, downloader
1288         else:
1289             fh = open(fname, 'wb')
1290             download = urllib.urlopen(url)
1291             while True:
1292                 chunk = download.read(4096)
1293                 if not chunk:
1294                     break
1295                 fh.write(chunk)
1296             fh.close()
1297             return fname
1298
1299     def tmp_file(self, id):
1300         fname = os.path.join(self.tmpdir, base64.b32encode(id).lower())
1301         return fname
1302
1303 _tfs = None # to appease pyflakes; is set in main()
1304 def print_tree():
1305     log('tree:\n' + _tfs.pprint())
1306
1307
1308 def unmarshal(obj):
1309     if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
1310         return obj
1311     elif isinstance(obj, unicode) or isinstance(obj, str):
1312         #log('unmarshal(%r)' % (obj,))
1313         return base64.b64decode(obj)
1314     elif isinstance(obj, list):
1315         return map(unmarshal, obj)
1316     elif isinstance(obj, dict):
1317         return dict([ (k,unmarshal(v)) for k,v in obj.items() ])
1318     else:
1319         raise ValueError('object type not int,str,list,dict,none (%s) (%r)' % (type(obj), obj))
1320
1321 def marshal(obj):
1322     if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
1323         return obj
1324     elif isinstance(obj, str):
1325         return base64.b64encode(obj)
1326     elif isinstance(obj, list) or isinstance(obj, tuple):
1327         return map(marshal, obj)
1328     elif isinstance(obj, dict):
1329         return dict([ (k,marshal(v)) for k,v in obj.items() ])
1330     else:
1331         raise ValueError('object type not int,str,list,dict,none (%s)' % type(obj))
1332
1333
1334 class TRPCProtocol(Protocol):
1335     compute_response_sha1 = True
1336     log_all_requests = False
1337
1338     def connectionMade(self):
1339         self.buf = []
1340
1341     def dataReceived(self, data):
1342         if data == 'keepalive\n':
1343             log('keepalive connection on %r' % (self.transport,))
1344             self.keepalive = True
1345             return
1346
1347         if not data.endswith('\n'):
1348             self.buf.append(data)
1349             return
1350         if self.buf:
1351             self.buf.append(data)
1352             reqstr = ''.join(self.buf)
1353             self.buf = []
1354             self.dispatch_request(reqstr)
1355         else:
1356             self.dispatch_request(data)
1357
1358     def dispatch_request(self, reqstr):
1359         try:
1360             req = simplejson.loads(reqstr)
1361         except ValueError, ve:
1362             log(ve)
1363             return
1364
1365         d = defer.maybeDeferred(self.handle_request, req)
1366         d.addCallback(self.send_response)
1367         d.addErrback(self.send_error)
1368
1369     def send_error(self, failure):
1370         log('failure: %s' % (failure,))
1371         if failure.check(TFSIOError):
1372             e = failure.value
1373             self.send_response(['error', 'errno', e.args[0], e.args[1]])
1374         else:
1375             self.send_response(['error', 'failure', str(failure)])
1376
1377     def send_response(self, result):
1378         response = simplejson.dumps(result)
1379         header = { 'len': len(response), }
1380         if self.compute_response_sha1:
1381             header['sha1'] = base64.b64encode(sha.new(response).digest())
1382         hdr = simplejson.dumps(header)
1383         self.transport.write(hdr)
1384         self.transport.write('\n')
1385         self.transport.write(response)
1386         self.transport.loseConnection()
1387
1388     def connectionLost(self, reason):
1389         if hasattr(self, 'keepalive'):
1390             log('keepalive connection %r lost, shutting down' % (self.transport,))
1391             reactor.callLater(0, reactor.stop)
1392
1393     def handle_request(self, req):
1394         if type(req) is not list or not req or len(req) < 1:
1395             return ['error', 'malformed request']
1396         if req[0] == 'call':
1397             if len(req) < 3:
1398                 return ['error', 'malformed request']
1399             methname = req[1]
1400             try:
1401                 args = unmarshal(req[2])
1402             except ValueError, ve:
1403                 return ['error', 'malformed arguments', str(ve)]
1404
1405             try:
1406                 meth = getattr(self.factory.server, methname)
1407             except AttributeError, ae:
1408                 return ['error', 'no such method', str(ae)]
1409
1410             if self.log_all_requests:
1411                 log('call %s(%s)' % (methname, ', '.join(map(repr, args))))
1412             try:
1413                 result = meth(*args)
1414             except TFSIOError, e:
1415                 log('errno: %s; %s' % e.args)
1416                 return ['error', 'errno', e.args[0], e.args[1]]
1417             except Exception, e:
1418                 log('exception: ' + traceback.format_exc())
1419                 return ['error', 'exception', str(e)]
1420             d = defer.succeed(None)
1421             d.addCallback(lambda junk: result) # result may be Deferred
1422             d.addCallback(lambda res: ['result', marshal(res)]) # only applies if not errback
1423             return d
1424
1425 class TFSServer(object):
1426     def __init__(self, socket_path, server=None):
1427         self.socket_path = socket_path
1428         log('TFSServer init socket: %s' % (socket_path,))
1429
1430         self.factory = Factory()
1431         self.factory.protocol = TRPCProtocol
1432         if server:
1433             self.factory.server = server
1434         else:
1435             self.factory.server = self
1436
1437     def get_service(self):
1438         if not hasattr(self, 'svc'):
1439             from twisted.application import strports
1440             self.svc = strports.service('unix:'+self.socket_path, self.factory)
1441         return self.svc
1442
1443     def run(self):
1444         svc = self.get_service()
1445         def ss():
1446             try:
1447                 svc.startService()
1448             except:
1449                 reactor.callLater(0, reactor.stop)
1450                 raise
1451         reactor.callLater(0, ss)
1452         reactor.run()
1453
1454     def hello(self):
1455         return 'pleased to meet you'
1456
1457     def echo(self, arg):
1458         return arg
1459
1460     def failex(self):
1461         raise ValueError('expected')
1462
1463     def fail(self):
1464         return defer.maybeDeferred(self.failex)
1465
1466 class RPCError(RuntimeError):
1467     pass
1468
1469 class TRPC(object):
1470     def __init__(self, socket_fname):
1471         self.socket_fname = socket_fname
1472         self.keepalive = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1473         self.keepalive.connect(self.socket_fname)
1474         self.keepalive.send('keepalive\n')
1475         log('requested keepalive on %s' % (self.keepalive,))
1476
1477     def req(self, req):
1478         # open conenction to trpc server
1479         s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1480         s.connect(self.socket_fname)
1481         # send request
1482         s.send(simplejson.dumps(req))
1483         s.send('\n')
1484         # read response header
1485         hdr_data = s.recv(8192)
1486         first_newline = hdr_data.index('\n')
1487         header = hdr_data[:first_newline]
1488         data = hdr_data[first_newline+1:]
1489         hdr = simplejson.loads(header)
1490         hdr_len = hdr['len']
1491         if hdr.has_key('sha1'):
1492             hdr_sha1 = base64.b64decode(hdr['sha1'])
1493             spool = [data]
1494             spool_sha = sha.new(data)
1495             # spool response
1496             while True:
1497                 data = s.recv(8192)
1498                 if data:
1499                     spool.append(data)
1500                     spool_sha.update(data)
1501                 else:
1502                     break
1503         else:
1504             spool = [data]
1505             # spool response
1506             while True:
1507                 data = s.recv(8192)
1508                 if data:
1509                     spool.append(data)
1510                 else:
1511                     break
1512         s.close()
1513         # decode response
1514         resp = ''.join(spool)
1515         spool = None
1516         assert hdr_len == len(resp), str((hdr_len, len(resp), repr(resp)))
1517         if hdr.has_key('sha1'):
1518             data_sha1 = spool_sha.digest()
1519             spool = spool_sha = None
1520             assert hdr_sha1 == data_sha1, str((base32.b2a(hdr_sha1), base32.b2a(data_sha1)))
1521         #else:
1522             #print 'warning, server provided no sha1 to check'
1523         return resp
1524
1525     def call(self, methodname, *args):
1526         res = self.req(['call', methodname, marshal(args)])
1527
1528         result = simplejson.loads(res)
1529         if not result or len(result) < 2:
1530             raise TypeError('malformed response %r' % (result,))
1531         if result[0] == 'error':
1532             if result[1] == 'errno':
1533                 raise TFSIOError(result[2], result[3])
1534             else:
1535                 raise RPCError(*(result[1:])) # error, exception / error, failure
1536         elif result[0] == 'result':
1537             return unmarshal(result[1])
1538         else:
1539             raise TypeError('unknown response type %r' % (result[0],))
1540
1541     def shutdown(self):
1542         log('shutdown() closing keepalive %s' % (self.keepalive,))
1543         self.keepalive.close()
1544
1545 # (cut-n-pasted here due to an ImportError / some py2app linkage issues)
1546 #from twisted.scripts._twistd_unix import daemonize
1547 def daemonize():
1548     # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
1549     if os.fork():   # launch child and...
1550         os._exit(0) # kill off parent
1551     os.setsid()
1552     if os.fork():   # launch child and...
1553         os._exit(0) # kill off parent again.
1554     os.umask(077)
1555     null=os.open('/dev/null', os.O_RDWR)
1556     for i in range(3):
1557         try:
1558             os.dup2(null, i)
1559         except OSError, e:
1560             if e.errno != errno.EBADF:
1561                 raise
1562     os.close(null)
1563
1564 def main(argv):
1565     log("main(%s)" % (argv,))
1566
1567     # check for version or help options (no args == help)
1568     if not argv:
1569         argv = ['--help']
1570     if len(argv) == 1 and argv[0] in ['-h', '--help']:
1571         config = TahoeFuseOptions()
1572         print >> sys.stderr, config
1573         print >> sys.stderr, 'fuse usage follows:'
1574     if len(argv) == 1 and argv[0] in ['-h', '--help', '--version']:
1575         launch_tahoe_fuse(TahoeFuseLocal, None, argv)
1576         return -2
1577
1578     # parse command line options
1579     config = TahoeFuseOptions()
1580     try:
1581         #print 'parsing', argv
1582         config.parseOptions(argv)
1583     except usage.error, e:
1584         print config
1585         print e
1586         return -1
1587
1588     # check for which alias or uri is specified
1589     if config['alias']:
1590         alias = config['alias']
1591         #print 'looking for aliases in', config['node-directory']
1592         aliases = get_aliases(os.path.expanduser(config['node-directory']))
1593         if alias not in aliases:
1594             raise usage.error('Alias %r not found' % (alias,))
1595         root_uri = aliases[alias]
1596         root_name = alias
1597     elif config['root-uri']:
1598         root_uri = config['root-uri']
1599         root_name = 'uri_' + base32.b2a(tagged_hash('root_name', root_uri))[:12]
1600         # test the uri for structural validity:
1601         try:
1602             DirectoryURI.init_from_string(root_uri)
1603         except:
1604             raise usage.error('root-uri must be a valid directory uri (not %r)' % (root_uri,))
1605     else:
1606         raise usage.error('At least one of --alias or --root-uri must be specified')
1607
1608     nodedir = config['node-directory']
1609     nodeurl = config['node-url']
1610     if not nodeurl:
1611         nodeurl = getnodeurl(nodedir)
1612
1613     # allocate socket
1614     socket_dir = os.path.join(os.path.expanduser(nodedir), "tfuse.sockets")
1615     socket_path = os.path.join(socket_dir, root_name)
1616     if len(socket_path) > 103:
1617         # try googling AF_UNIX and sun_len for some taste of why this oddity exists.
1618         raise OSError(errno.ENAMETOOLONG, 'socket path too long (%s)' % (socket_path,))
1619
1620     fileutil.make_dirs(socket_dir, 0700)
1621     if os.path.exists(socket_path):
1622         log('socket exists')
1623         if config['server-shutdown']:
1624             log('calling shutdown')
1625             trpc = TRPC(socket_path)
1626             result = trpc.shutdown()
1627             log('result: %r' % (result,))
1628             log('called shutdown')
1629             return
1630         else:
1631             raise OSError(errno.EEXIST, 'fuse already running (%r exists)' % (socket_path,))
1632     elif config['server-shutdown']:
1633         raise OSError(errno.ENOTCONN, '--server-shutdown specified, but server not running')
1634
1635     if not os.path.exists(config.mountpoint):
1636         raise OSError(errno.ENOENT, 'No such file or directory: "%s"' % (config.mountpoint,))
1637
1638     global _tfs
1639     #
1640     # Standalone ("no-split")
1641     #
1642     if config['no-split']:
1643         reopen_logfile('tfuse.%s.unsplit.log' % (root_name,))
1644         log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n')
1645
1646         cache_timeout = float(config['cache-timeout'])
1647         tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=False)
1648         #print tfs.pprint()
1649
1650         # make tfs instance accesible to print_tree() for dbg
1651         _tfs = tfs
1652
1653         args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
1654         launch_tahoe_fuse(TahoeFuseLocal, tfs, args)
1655
1656     #
1657     # Server
1658     #
1659     elif config['server']:
1660         reopen_logfile('tfuse.%s.server.log' % (root_name,))
1661         log('\n'+(24*'_')+'init (server)'+(24*'_')+'\n')
1662
1663         log('daemonizing')
1664         daemonize()
1665
1666         try:
1667             cache_timeout = float(config['cache-timeout'])
1668             tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=True)
1669             #print tfs.pprint()
1670
1671             # make tfs instance accesible to print_tree() for dbg
1672             _tfs = tfs
1673
1674             log('launching tfs server')
1675             tfuse = TahoeFuseBase(tfs)
1676             tfs_server = TFSServer(socket_path, tfuse)
1677             tfs_server.run()
1678             log('tfs server ran, exiting')
1679         except:
1680             log('exception: ' + traceback.format_exc())
1681
1682     #
1683     # Client
1684     #
1685     else:
1686         reopen_logfile('tfuse.%s.client.log' % (root_name,))
1687         log('\n'+(24*'_')+'init (client)'+(24*'_')+'\n')
1688
1689         server_args = [sys.executable, sys.argv[0], '--server'] + argv
1690         if 'Allmydata.app/Contents/MacOS' in sys.executable:
1691             # in this case blackmatch is the 'fuse' subcommand of the 'tahoe' executable
1692             # otherwise we assume blackmatch is being run from source
1693             server_args.insert(2, 'fuse')
1694         #print 'launching server:', server_args
1695         server = subprocess.Popen(server_args)
1696         waiting_since = time.time()
1697         wait_at_most = 8
1698         while not os.path.exists(socket_path):
1699             log('waiting for appearance of %r' % (socket_path,))
1700             time.sleep(1)
1701             if time.time() - waiting_since > wait_at_most:
1702                 log('%r did not appear within %ss' % (socket_path, wait_at_most))
1703                 raise IOError(2, 'no socket %s' % (socket_path,))
1704         #print 'launched server'
1705         trpc = TRPC(socket_path)
1706
1707
1708         args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
1709         launch_tahoe_fuse(TahoeFuseShim, trpc, args)
1710
1711         
1712 if __name__ == '__main__':
1713     sys.exit(main(sys.argv[1:]))