]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - contrib/fuse/impl_c/blackmatch.py
add Protovis.js-based download-status timeline visualization
[tahoe-lafs/tahoe-lafs.git] / contrib / fuse / impl_c / blackmatch.py
1 #!/usr/bin/env python
2
3 #-----------------------------------------------------------------------------------------------
4 from allmydata.uri import CHKFileURI, DirectoryURI, LiteralFileURI, is_literal_file_uri
5 from allmydata.scripts.common_http import do_http as do_http_req
6 from allmydata.util.hashutil import tagged_hash
7 from allmydata.util.assertutil import precondition
8 from allmydata.util import base32, fileutil, observer
9 from allmydata.scripts.common import get_aliases
10
11 from twisted.python import usage
12 from twisted.python.failure import Failure
13 from twisted.internet.protocol import Factory, Protocol
14 from twisted.internet import reactor, defer, task
15 from twisted.web import client
16
17 import base64
18 import errno
19 import heapq
20 import sha
21 import socket
22 import stat
23 import subprocess
24 import sys
25 import os
26 import weakref
27 #import pprint
28
29 # one needs either python-fuse to have been installed in sys.path, or
30 # suitable affordances to be made in the build or runtime environment
31 import fuse
32
33 import time
34 import traceback
35 import simplejson
36 import urllib
37
38 VERSIONSTR="0.7"
39
40 USAGE = 'usage: tahoe fuse [dir_cap_name] [fuse_options] mountpoint'
41 DEFAULT_DIRECTORY_VALIDITY=26
42
43 if not hasattr(fuse, '__version__'):
44     raise RuntimeError, \
45         "your fuse-py doesn't know of fuse.__version__, probably it's too old."
46
47 fuse.fuse_python_api = (0, 2)
48 fuse.feature_assert('stateful_files', 'has_init')
49
50 class TahoeFuseOptions(usage.Options):
51     optParameters = [
52         ["node-directory", None, "~/.tahoe",
53          "Look here to find out which Tahoe node should be used for all "
54          "operations. The directory should either contain a full Tahoe node, "
55          "or a file named node.url which points to some other Tahoe node. "
56          "It should also contain a file named private/aliases which contains "
57          "the mapping from alias name to root dirnode URI."
58          ],
59         ["node-url", None, None,
60          "URL of the tahoe node to use, a URL like \"http://127.0.0.1:3456\". "
61          "This overrides the URL found in the --node-directory ."],
62         ["alias", None, None,
63          "Which alias should be mounted."],
64         ["root-uri", None, None,
65          "Which root directory uri should be mounted."],
66         ["cache-timeout", None, 20,
67          "Time, in seconds, to cache directory data."],
68         ]
69     optFlags = [
70         ['no-split', None,
71          'run stand-alone; no splitting into client and server'],
72         ['server', None,
73          'server mode (should not be used by end users)'],
74         ['server-shutdown', None,
75          'shutdown server (should not be used by end users)'],
76          ]
77
78     def __init__(self):
79         usage.Options.__init__(self)
80         self.fuse_options = []
81         self.mountpoint = None
82
83     def opt_option(self, fuse_option):
84         """
85         Pass mount options directly to fuse.  See below.
86         """
87         self.fuse_options.append(fuse_option)
88         
89     opt_o = opt_option
90
91     def parseArgs(self, mountpoint=''):
92         self.mountpoint = mountpoint
93
94     def getSynopsis(self):
95         return "%s [options] mountpoint" % (os.path.basename(sys.argv[0]),)
96
97 logfile = file('tfuse.log', 'ab')
98
99 def reopen_logfile(fname):
100     global logfile
101     log('switching to %s' % (fname,))
102     logfile.close()
103     logfile = file(fname, 'ab')
104
105 def log(msg):
106     logfile.write("%s: %s\n" % (time.asctime(), msg))
107     #time.sleep(0.1)
108     logfile.flush()
109
110 fuse.flog = log
111
112 def unicode_to_utf8_or_str(u):
113     if isinstance(u, unicode):
114         return u.encode('utf-8')
115     else:
116         precondition(isinstance(u, str), repr(u))
117         return u
118
119 def do_http(method, url, body=''):
120     resp = do_http_req(method, url, body)
121     log('do_http(%s, %s) -> %s, %s' % (method, url, resp.status, resp.reason))
122     if resp.status not in (200, 201):
123         raise RuntimeError('http response (%s, %s)' % (resp.status, resp.reason))
124     else:
125         return resp.read()
126
127 def flag2mode(flags):
128     log('flag2mode(%r)' % (flags,))
129     #md = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
130     md = {os.O_RDONLY: 'rb', os.O_WRONLY: 'wb', os.O_RDWR: 'w+b'}
131     m = md[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
132
133     if flags & os.O_APPEND:
134         m = m.replace('w', 'a', 1)
135
136     return m
137
138 class TFSIOError(IOError):
139     pass
140
141 class ENOENT(TFSIOError):
142     def __init__(self, msg):
143         TFSIOError.__init__(self, errno.ENOENT, msg)
144
145 class EINVAL(TFSIOError):
146     def __init__(self, msg):
147         TFSIOError.__init__(self, errno.EINVAL, msg)
148
149 class EACCESS(TFSIOError):
150     def __init__(self, msg):
151         TFSIOError.__init__(self, errno.EACCESS, msg)
152
153 class EEXIST(TFSIOError):
154     def __init__(self, msg):
155         TFSIOError.__init__(self, errno.EEXIST, msg)
156
157 class EIO(TFSIOError):
158     def __init__(self, msg):
159         TFSIOError.__init__(self, errno.EIO, msg)
160
161 def logargsretexc(meth):
162     def inner_logargsretexc(self, *args, **kwargs):
163         log("%s(%r, %r)" % (meth, args, kwargs))
164         try:
165             ret = meth(self, *args, **kwargs)
166         except:
167             log('exception:\n%s' % (traceback.format_exc(),))
168             raise
169         log("ret: %r" % (ret, ))
170         return ret
171     inner_logargsretexc.__name__ = '<logwrap(%s)>' % (meth,)
172     return inner_logargsretexc
173
174 def logexc(meth):
175     def inner_logexc(self, *args, **kwargs):
176         try:
177             ret = meth(self, *args, **kwargs)
178         except TFSIOError, tie:
179             log('error: %s' % (tie,))
180             raise
181         except:
182             log('exception:\n%s' % (traceback.format_exc(),))
183             raise
184         return ret
185     inner_logexc.__name__ = '<logwrap(%s)>' % (meth,)
186     return inner_logexc
187
188 def log_exc():
189     log('exception:\n%s' % (traceback.format_exc(),))
190
191 def repr_mode(mode=None):
192     if mode is None:
193         return 'none'
194     fields = ['S_ENFMT', 'S_IFBLK', 'S_IFCHR', 'S_IFDIR', 'S_IFIFO', 'S_IFLNK', 'S_IFREG', 'S_IFSOCK', 'S_IRGRP', 'S_IROTH', 'S_IRUSR', 'S_IRWXG', 'S_IRWXO', 'S_IRWXU', 'S_ISGID', 'S_ISUID', 'S_ISVTX', 'S_IWGRP', 'S_IWOTH', 'S_IWUSR', 'S_IXGRP', 'S_IXOTH', 'S_IXUSR']
195     ret = []
196     for field in fields:
197         fval = getattr(stat, field)
198         if (mode & fval) == fval:
199             ret.append(field)
200     return '|'.join(ret)
201
202 def repr_flags(flags=None):
203     if flags is None:
204         return 'none'
205     fields = [ 'O_APPEND', 'O_CREAT', 'O_DIRECT', 'O_DIRECTORY', 'O_EXCL', 'O_EXLOCK',
206                'O_LARGEFILE', 'O_NDELAY', 'O_NOCTTY', 'O_NOFOLLOW', 'O_NONBLOCK', 'O_RDWR',
207                'O_SHLOCK', 'O_SYNC', 'O_TRUNC', 'O_WRONLY', ]
208     ret = []
209     for field in fields:
210         fval = getattr(os, field, None)
211         if fval is not None and (flags & fval) == fval:
212             ret.append(field)
213     if not ret:
214         ret = ['O_RDONLY']
215     return '|'.join(ret)
216
217 class DownloaderWithReadQueue(object):
218     def __init__(self):
219         self.read_heap = []
220         self.dest_file_name = None
221         self.running = False
222         self.done_observer = observer.OneShotObserverList()
223
224     def __repr__(self):
225         name = self.dest_file_name is None and '<none>' or os.path.basename(self.dest_file_name)
226         return "<DWRQ(%s)> q(%s)" % (name, len(self.read_heap or []))
227
228     def log(self, msg):
229         log("%r: %s" % (self, msg))
230
231     @logexc
232     def start(self, url, dest_file_name, target_size, interval=0.5):
233         self.log('start(%s, %s, %s)' % (url, dest_file_name, target_size, ))
234         self.dest_file_name = dest_file_name
235         file(self.dest_file_name, 'wb').close() # touch
236         self.target_size = target_size
237         self.log('start()')
238         self.loop = task.LoopingCall(self._check_file_size)
239         self.loop.start(interval)
240         self.running = True
241         d = client.downloadPage(url, self.dest_file_name)
242         d.addCallbacks(self.done, self.fail)
243         return d
244
245     def when_done(self):
246         return self.done_observer.when_fired()
247
248     def get_size(self):
249         if os.path.exists(self.dest_file_name):
250             return os.path.getsize(self.dest_file_name)
251         else:
252             return 0
253
254     @logexc
255     def _read(self, posn, size):
256         #self.log('_read(%s, %s)' % (posn, size))
257         f = file(self.dest_file_name, 'rb')
258         f.seek(posn)
259         data = f.read(size)
260         f.close()
261         return data
262
263     @logexc
264     def read(self, posn, size):
265         self.log('read(%s, %s)' % (posn, size))
266         if self.read_heap is None:
267             raise ValueError('read() called when already shut down')
268         if posn+size > self.target_size:
269             size -= self.target_size - posn
270         fsize = self.get_size()
271         if posn+size < fsize:
272             return defer.succeed(self._read(posn, size))
273         else:
274             d = defer.Deferred()
275             dread = (posn+size, posn, d)
276             heapq.heappush(self.read_heap, dread)
277         return d
278
279     @logexc
280     def _check_file_size(self):
281         #self.log('_check_file_size()')
282         if self.read_heap:
283             try:
284                 size = self.get_size()
285                 while self.read_heap and self.read_heap[0][0] <= size:
286                     end, start, d = heapq.heappop(self.read_heap)
287                     data = self._read(start, end-start)
288                     d.callback(data)
289             except Exception, e:
290                 log_exc()
291                 failure = Failure()
292
293     @logexc
294     def fail(self, failure):
295         self.log('fail(%s)' % (failure,))
296         self.running = False
297         if self.loop.running:
298             self.loop.stop()
299         # fail any reads still pending
300         for end, start, d in self.read_heap:
301             reactor.callLater(0, d.errback, failure)
302         self.read_heap = None
303         self.done_observer.fire_if_not_fired(failure)
304         return failure
305
306     @logexc
307     def done(self, result):
308         self.log('done()')
309         self.running = False
310         if self.loop.running:
311             self.loop.stop()
312         precondition(self.get_size() == self.target_size, self.get_size(), self.target_size)
313         self._check_file_size() # process anything left pending in heap
314         precondition(not self.read_heap, self.read_heap, self.target_size, self.get_size())
315         self.read_heap = None
316         self.done_observer.fire_if_not_fired(self)
317         return result
318
319
320 class TahoeFuseFile(object):
321
322     #def __init__(self, path, flags, *mode):
323     def __init__(self, tfs, path, flags, *mode):
324         log("TFF: __init__(%r, %r:%s, %r:%s)" % (path, flags, repr_flags(flags), mode, repr_mode(*mode)))
325         self.tfs = tfs
326         self.downloader = None
327
328         self._path = path # for tahoe put
329         try:
330             self.parent, self.name, self.fnode = self.tfs.get_parent_name_and_child(path)
331             m = flag2mode(flags)
332             log('TFF: flags2(mode) -> %s' % (m,))
333             if m[0] in 'wa':
334                 # write
335                 self.fname = self.tfs.cache.tmp_file(os.urandom(20))
336                 if self.fnode is None:
337                     log('TFF: [%s] open() for write: no file node, creating new File %s' % (self.name, self.fname, ))
338                     self.fnode = File(0, LiteralFileURI.BASE_STRING)
339                     self.fnode.tmp_fname = self.fname # XXX kill this
340                     self.parent.add_child(self.name, self.fnode, {})
341                 elif hasattr(self.fnode, 'tmp_fname'):
342                     self.fname = self.fnode.tmp_fname
343                     log('TFF: [%s] open() for write: existing file node lists %s' % (self.name, self.fname, ))
344                 else:
345                     log('TFF: [%s] open() for write: existing file node lists no tmp_file, using new %s' % (self.name, self.fname, ))
346                 if mode != (0600,):
347                     log('TFF: [%s] changing mode %s(%s) to 0600' % (self.name, repr_mode(*mode), mode))
348                     mode = (0600,)
349                 log('TFF: [%s] opening(%s) with flags %s(%s), mode %s(%s)' % (self.name, self.fname, repr_flags(flags|os.O_CREAT), flags|os.O_CREAT, repr_mode(*mode), mode))
350                 #self.file = os.fdopen(os.open(self.fname, flags|os.O_CREAT, *mode), m)
351                 self.file = os.fdopen(os.open(self.fname, flags|os.O_CREAT, *mode), m)
352                 self.fd = self.file.fileno()
353                 log('TFF: opened(%s) for write' % self.fname)
354                 self.open_for_write = True
355             else:
356                 # read
357                 assert self.fnode is not None
358                 uri = self.fnode.get_uri()
359
360                 # XXX make this go away
361                 if hasattr(self.fnode, 'tmp_fname'):
362                     self.fname = self.fnode.tmp_fname
363                     log('TFF: reopening(%s) for reading' % self.fname)
364                 else:
365                     if is_literal_file_uri(uri) or not self.tfs.async:
366                         log('TFF: synchronously fetching file from cache for reading')
367                         self.fname = self.tfs.cache.get_file(uri)
368                     else:
369                         log('TFF: asynchronously fetching file from cache for reading')
370                         self.fname, self.downloader = self.tfs.cache.async_get_file(uri)
371                         # downloader is None if the cache already contains the file
372                         if self.downloader is not None:
373                             d = self.downloader.when_done()
374                             def download_complete(junk):
375                                 # once the download is complete, revert to non-async behaviour
376                                 self.downloader = None
377                             d.addCallback(download_complete)
378
379                 self.file = os.fdopen(os.open(self.fname, flags, *mode), m)
380                 self.fd = self.file.fileno()
381                 self.open_for_write = False
382                 log('TFF: opened(%s) for read' % self.fname)
383         except:
384             log_exc()
385             raise
386
387     def log(self, msg):
388         log("<TFF(%s:%s)> %s" % (os.path.basename(self.fname), self.name, msg))
389
390     @logexc
391     def read(self, size, offset):
392         self.log('read(%r, %r)' % (size, offset, ))
393         if self.downloader:
394             # then we're busy doing an async download
395             # (and hence implicitly, we're in an environment that supports twisted)
396             #self.log('passing read() to %s' % (self.downloader, ))
397             d = self.downloader.read(offset, size)
398             def thunk(failure):
399                 raise EIO(str(failure))
400             d.addErrback(thunk)
401             return d
402         else:
403             self.log('servicing read() from %s' % (self.file, ))
404             self.file.seek(offset)
405             return self.file.read(size)
406
407     @logexc
408     def write(self, buf, offset):
409         self.log("write(-%s-, %r)" % (len(buf), offset))
410         if not self.open_for_write:
411             return -errno.EACCES
412         self.file.seek(offset)
413         self.file.write(buf)
414         return len(buf)
415
416     @logexc
417     def release(self, flags):
418         self.log("release(%r)" % (flags,))
419         self.file.close()
420         if self.open_for_write:
421             size = os.path.getsize(self.fname)
422             self.fnode.size = size
423             file_cap = self.tfs.upload(self.fname)
424             self.fnode.ro_uri = file_cap
425             # XXX [ ] TODO: set metadata
426             # write new uri into parent dir entry
427             self.parent.add_child(self.name, self.fnode, {})
428             self.log("uploaded: %s" % (file_cap,))
429
430         # dbg
431         print_tree()
432
433     def _fflush(self):
434         if 'w' in self.file.mode or 'a' in self.file.mode:
435             self.file.flush()
436
437     @logexc
438     def fsync(self, isfsyncfile):
439         self.log("fsync(%r)" % (isfsyncfile,))
440         self._fflush()
441         if isfsyncfile and hasattr(os, 'fdatasync'):
442             os.fdatasync(self.fd)
443         else:
444             os.fsync(self.fd)
445
446     @logexc
447     def flush(self):
448         self.log("flush()")
449         self._fflush()
450         # cf. xmp_flush() in fusexmp_fh.c
451         os.close(os.dup(self.fd))
452
453     @logexc
454     def fgetattr(self):
455         self.log("fgetattr()")
456         s = os.fstat(self.fd)
457         d = stat_to_dict(s)
458         if self.downloader:
459             size = self.downloader.target_size
460             self.log("fgetattr() during async download, cache file: %s, size=%s" % (s, size))
461             d['st_size'] = size
462         self.log("fgetattr() -> %r" % (d,))
463         return d
464
465     @logexc
466     def ftruncate(self, len):
467         self.log("ftruncate(%r)" % (len,))
468         self.file.truncate(len)
469
470 class TahoeFuseBase(object):
471
472     def __init__(self, tfs):
473         log("TFB: __init__()")
474         self.tfs = tfs
475         self.files = {}
476
477     def log(self, msg):
478         log("<TFB> %s" % (msg, ))
479
480     @logexc
481     def readlink(self, path):
482         self.log("readlink(%r)" % (path,))
483         node = self.tfs.get_path(path)
484         if node:
485             raise EINVAL('Not a symlink') # nothing in tahoe is a symlink
486         else:
487             raise ENOENT('Invalid argument')
488
489     @logexc
490     def unlink(self, path):
491         self.log("unlink(%r)" % (path,))
492         self.tfs.unlink(path)
493
494     @logexc
495     def rmdir(self, path):
496         self.log("rmdir(%r)" % (path,))
497         self.tfs.unlink(path)
498
499     @logexc
500     def symlink(self, path, path1):
501         self.log("symlink(%r, %r)" % (path, path1))
502         self.tfs.link(path, path1)
503
504     @logexc
505     def rename(self, path, path1):
506         self.log("rename(%r, %r)" % (path, path1))
507         self.tfs.rename(path, path1)
508
509     @logexc
510     def link(self, path, path1):
511         self.log("link(%r, %r)" % (path, path1))
512         self.tfs.link(path, path1)
513
514     @logexc
515     def chmod(self, path, mode):
516         self.log("XX chmod(%r, %r)" % (path, mode))
517         #return -errno.EOPNOTSUPP
518
519     @logexc
520     def chown(self, path, user, group):
521         self.log("XX chown(%r, %r, %r)" % (path, user, group))
522         #return -errno.EOPNOTSUPP
523
524     @logexc
525     def truncate(self, path, len):
526         self.log("XX truncate(%r, %r)" % (path, len))
527         #return -errno.EOPNOTSUPP
528
529     @logexc
530     def utime(self, path, times):
531         self.log("XX utime(%r, %r)" % (path, times))
532         #return -errno.EOPNOTSUPP
533
534     @logexc
535     def statfs(self):
536         self.log("statfs()")
537         """
538         Should return an object with statvfs attributes (f_bsize, f_frsize...).
539         Eg., the return value of os.statvfs() is such a thing (since py 2.2).
540         If you are not reusing an existing statvfs object, start with
541         fuse.StatVFS(), and define the attributes.
542
543         To provide usable information (ie., you want sensible df(1)
544         output, you are suggested to specify the following attributes:
545
546             - f_bsize - preferred size of file blocks, in bytes
547             - f_frsize - fundamental size of file blcoks, in bytes
548                 [if you have no idea, use the same as blocksize]
549             - f_blocks - total number of blocks in the filesystem
550             - f_bfree - number of free blocks
551             - f_files - total number of file inodes
552             - f_ffree - nunber of free file inodes
553         """
554
555         block_size = 4096 # 4k
556         preferred_block_size = 131072 # 128k, c.f. seg_size
557         fs_size = 8*2**40 # 8Tb
558         fs_free = 2*2**40 # 2Tb
559
560         #s = fuse.StatVfs(f_bsize = preferred_block_size,
561         s = dict(f_bsize = preferred_block_size,
562                          f_frsize = block_size,
563                          f_blocks = fs_size / block_size,
564                          f_bfree = fs_free / block_size,
565                          f_bavail = fs_free / block_size,
566                          f_files = 2**30, # total files
567                          f_ffree = 2**20, # available files
568                          f_favail = 2**20, # available files (root)
569                          f_flag = 2, # no suid
570                          f_namemax = 255) # max name length
571         #self.log('statfs(): %r' % (s,))
572         return s
573
574     def fsinit(self):
575         self.log("fsinit()")
576
577     ##################################################################
578
579     @logexc
580     def readdir(self, path, offset):
581         self.log('readdir(%r, %r)' % (path, offset))
582         node = self.tfs.get_path(path)
583         if node is None:
584             return -errno.ENOENT
585         dirlist = ['.', '..'] + node.children.keys()
586         self.log('dirlist = %r' % (dirlist,))
587         #return [fuse.Direntry(d) for d in dirlist]
588         return dirlist
589
590     @logexc
591     def getattr(self, path):
592         self.log('getattr(%r)' % (path,))
593
594         if path == '/':
595             # we don't have any metadata for the root (no edge leading to it)
596             mode = (stat.S_IFDIR | 755)
597             mtime = self.tfs.root.mtime
598             s = TStat({}, st_mode=mode, st_nlink=1, st_mtime=mtime)
599             self.log('getattr(%r) -> %r' % (path, s))
600             #return s
601             return stat_to_dict(s)
602             
603         parent, name, child = self.tfs.get_parent_name_and_child(path)
604         if not child: # implicitly 'or not parent'
605             raise ENOENT('No such file or directory')
606         return stat_to_dict(parent.get_stat(name))
607
608     @logexc
609     def access(self, path, mode):
610         self.log("access(%r, %r)" % (path, mode))
611         node = self.tfs.get_path(path)
612         if not node:
613             return -errno.ENOENT
614         accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
615         if (mode & 0222):
616             if not node.writable():
617                 log('write access denied for %s (req:%o)' % (path, mode, ))
618                 return -errno.EACCES
619         #else:
620             #log('access granted for %s' % (path, ))
621
622     @logexc
623     def mkdir(self, path, mode):
624         self.log("mkdir(%r, %r)" % (path, mode))
625         self.tfs.mkdir(path)
626
627     ##################################################################
628     # file methods
629
630     def open(self, path, flags):
631         self.log('open(%r, %r)' % (path, flags, ))
632         if path in self.files:
633             # XXX todo [ ] should consider concurrent open files of differing modes
634             return
635         else:
636             tffobj = TahoeFuseFile(self.tfs, path, flags)
637             self.files[path] = tffobj
638
639     def create(self, path, flags, mode):
640         self.log('create(%r, %r, %r)' % (path, flags, mode))
641         if path in self.files:
642             # XXX todo [ ] should consider concurrent open files of differing modes
643             return
644         else:
645             tffobj = TahoeFuseFile(self.tfs, path, flags, mode)
646             self.files[path] = tffobj
647
648     def _get_file(self, path):
649         if not path in self.files:
650             raise ENOENT('No such file or directory: %s' % (path,))
651         return self.files[path]
652
653     ##
654
655     def read(self, path, size, offset):
656         self.log('read(%r, %r, %r)' % (path, size, offset, ))
657         return self._get_file(path).read(size, offset)
658
659     @logexc
660     def write(self, path, buf, offset):
661         self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
662         return self._get_file(path).write(buf, offset)
663
664     @logexc
665     def release(self, path, flags):
666         self.log("release(%r, %r)" % (path, flags,))
667         self._get_file(path).release(flags)
668         del self.files[path]
669
670     @logexc
671     def fsync(self, path, isfsyncfile):
672         self.log("fsync(%r, %r)" % (path, isfsyncfile,))
673         return self._get_file(path).fsync(isfsyncfile)
674
675     @logexc
676     def flush(self, path):
677         self.log("flush(%r)" % (path,))
678         return self._get_file(path).flush()
679
680     @logexc
681     def fgetattr(self, path):
682         self.log("fgetattr(%r)" % (path,))
683         return self._get_file(path).fgetattr()
684
685     @logexc
686     def ftruncate(self, path, len):
687         self.log("ftruncate(%r, %r)" % (path, len,))
688         return self._get_file(path).ftruncate(len)
689
690 class TahoeFuseLocal(TahoeFuseBase, fuse.Fuse):
691     def __init__(self, tfs, *args, **kw):
692         log("TFL: __init__(%r, %r)" % (args, kw))
693         TahoeFuseBase.__init__(self, tfs)
694         fuse.Fuse.__init__(self, *args, **kw)
695
696     def log(self, msg):
697         log("<TFL> %s" % (msg, ))
698
699     def main(self, *a, **kw):
700         self.log("main(%r, %r)" % (a, kw))
701         return fuse.Fuse.main(self, *a, **kw)
702
703     # overrides for those methods which return objects not marshalled
704     def fgetattr(self, path):
705         return TStat({}, **(TahoeFuseBase.fgetattr(self, path)))
706
707     def getattr(self, path):
708         return TStat({}, **(TahoeFuseBase.getattr(self, path)))
709
710     def statfs(self):
711         return fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
712         #self.log('statfs()')
713         #ret = fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
714         #self.log('statfs(): %r' % (ret,))
715         #return ret
716
717     @logexc
718     def readdir(self, path, offset):
719         return [ fuse.Direntry(d) for d in TahoeFuseBase.readdir(self, path, offset) ]
720
721 class TahoeFuseShim(fuse.Fuse):
722     def __init__(self, trpc, *args, **kw):
723         log("TF: __init__(%r, %r)" % (args, kw))
724         self.trpc = trpc
725         fuse.Fuse.__init__(self, *args, **kw)
726
727     def log(self, msg):
728         log("<TFs> %s" % (msg, ))
729
730     @logexc
731     def readlink(self, path):
732         self.log("readlink(%r)" % (path,))
733         return self.trpc.call('readlink', path)
734
735     @logexc
736     def unlink(self, path):
737         self.log("unlink(%r)" % (path,))
738         return self.trpc.call('unlink', path)
739
740     @logexc
741     def rmdir(self, path):
742         self.log("rmdir(%r)" % (path,))
743         return self.trpc.call('unlink', path)
744
745     @logexc
746     def symlink(self, path, path1):
747         self.log("symlink(%r, %r)" % (path, path1))
748         return self.trpc.call('link', path, path1)
749
750     @logexc
751     def rename(self, path, path1):
752         self.log("rename(%r, %r)" % (path, path1))
753         return self.trpc.call('rename', path, path1)
754
755     @logexc
756     def link(self, path, path1):
757         self.log("link(%r, %r)" % (path, path1))
758         return self.trpc.call('link', path, path1)
759
760     @logexc
761     def chmod(self, path, mode):
762         self.log("XX chmod(%r, %r)" % (path, mode))
763         return self.trpc.call('chmod', path, mode)
764
765     @logexc
766     def chown(self, path, user, group):
767         self.log("XX chown(%r, %r, %r)" % (path, user, group))
768         return self.trpc.call('chown', path, user, group)
769
770     @logexc
771     def truncate(self, path, len):
772         self.log("XX truncate(%r, %r)" % (path, len))
773         return self.trpc.call('truncate', path, len)
774
775     @logexc
776     def utime(self, path, times):
777         self.log("XX utime(%r, %r)" % (path, times))
778         return self.trpc.call('utime', path, times)
779
780     @logexc
781     def statfs(self):
782         self.log("statfs()")
783         response = self.trpc.call('statfs')
784         #self.log("statfs(): %r" % (response,))
785         kwargs = dict([ (str(k),v) for k,v in response.items() ])
786         return fuse.StatVfs(**kwargs)
787
788     def fsinit(self):
789         self.log("fsinit()")
790
791     def main(self, *a, **kw):
792         self.log("main(%r, %r)" % (a, kw))
793
794         return fuse.Fuse.main(self, *a, **kw)
795
796     ##################################################################
797
798     @logexc
799     def readdir(self, path, offset):
800         self.log('readdir(%r, %r)' % (path, offset))
801         return [ fuse.Direntry(d) for d in self.trpc.call('readdir', path, offset) ]
802
803     @logexc
804     def getattr(self, path):
805         self.log('getattr(%r)' % (path,))
806         response = self.trpc.call('getattr', path)
807         kwargs = dict([ (str(k),v) for k,v in response.items() ])
808         s = TStat({}, **kwargs)
809         self.log('getattr(%r) -> %r' % (path, s))
810         return s
811
812     @logexc
813     def access(self, path, mode):
814         self.log("access(%r, %r)" % (path, mode))
815         return self.trpc.call('access', path, mode)
816
817     @logexc
818     def mkdir(self, path, mode):
819         self.log("mkdir(%r, %r)" % (path, mode))
820         return self.trpc.call('mkdir', path, mode)
821
822     ##################################################################
823     # file methods
824
825     def open(self, path, flags):
826         self.log('open(%r, %r)' % (path, flags, ))
827         return self.trpc.call('open', path, flags)
828
829     def create(self, path, flags, mode):
830         self.log('create(%r, %r, %r)' % (path, flags, mode))
831         return self.trpc.call('create', path, flags, mode)
832
833     ##
834
835     def read(self, path, size, offset):
836         self.log('read(%r, %r, %r)' % (path, size, offset, ))
837         return self.trpc.call('read', path, size, offset)
838
839     @logexc
840     def write(self, path, buf, offset):
841         self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
842         return self.trpc.call('write', path, buf, offset)
843
844     @logexc
845     def release(self, path, flags):
846         self.log("release(%r, %r)" % (path, flags,))
847         return self.trpc.call('release', path, flags)
848
849     @logexc
850     def fsync(self, path, isfsyncfile):
851         self.log("fsync(%r, %r)" % (path, isfsyncfile,))
852         return self.trpc.call('fsync', path, isfsyncfile)
853
854     @logexc
855     def flush(self, path):
856         self.log("flush(%r)" % (path,))
857         return self.trpc.call('flush', path)
858
859     @logexc
860     def fgetattr(self, path):
861         self.log("fgetattr(%r)" % (path,))
862         #return self.trpc.call('fgetattr', path)
863         response = self.trpc.call('fgetattr', path)
864         kwargs = dict([ (str(k),v) for k,v in response.items() ])
865         s = TStat({}, **kwargs)
866         self.log('getattr(%r) -> %r' % (path, s))
867         return s
868
869     @logexc
870     def ftruncate(self, path, len):
871         self.log("ftruncate(%r, %r)" % (path, len,))
872         return self.trpc.call('ftruncate', path, len)
873
874
875 def launch_tahoe_fuse(tf_class, tobj, argv):
876     sys.argv = ['tahoe fuse'] + list(argv)
877     log('setting sys.argv=%r' % (sys.argv,))
878     config = TahoeFuseOptions()
879     version = "%prog " +VERSIONSTR+", fuse "+ fuse.__version__
880     server = tf_class(tobj, version=version, usage=config.getSynopsis(), dash_s_do='setsingle')
881     server.parse(errex=1)
882     server.main()
883
884 def getnodeurl(nodedir):
885     f = file(os.path.expanduser(os.path.join(nodedir, "node.url")), 'rb')
886     nu = f.read().strip()
887     f.close()
888     if nu[-1] != "/":
889         nu += "/"
890     return nu
891
892 def fingerprint(uri):
893     if uri is None:
894         return None
895     return base64.b32encode(sha.new(uri).digest()).lower()[:6]
896
897 stat_fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
898                 'st_atime', 'st_mtime', 'st_ctime', ]
899 def stat_to_dict(statobj, fields=None):
900     if fields is None:
901         fields = stat_fields
902     d = {}
903     for f in fields:
904         d[f] = getattr(statobj, f, None)
905     return d
906
907 class TStat(fuse.Stat):
908     # in fuse 0.2, these are set by fuse.Stat.__init__
909     # in fuse 0.2-pre3 (hardy) they are not. badness ensues if they're missing
910     st_mode  = None
911     st_ino   = 0
912     st_dev   = 0
913     st_nlink = None
914     st_uid   = 0
915     st_gid   = 0
916     st_size  = 0
917     st_atime = 0
918     st_mtime = 0
919     st_ctime = 0
920
921     fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
922                'st_atime', 'st_mtime', 'st_ctime', ]
923     def __init__(self, metadata, **kwargs):
924         # first load any stat fields present in 'metadata'
925         for st in [ 'mtime', 'ctime' ]:
926             if st in metadata:
927                 setattr(self, "st_%s" % st, metadata[st])
928         for st in self.fields:
929             if st in metadata:
930                 setattr(self, st, metadata[st])
931
932         # then set any values passed in as kwargs
933         fuse.Stat.__init__(self, **kwargs)
934
935     def __repr__(self):
936         return "<Stat%r>" % (stat_to_dict(self),)
937
938 class Directory(object):
939     def __init__(self, tfs, ro_uri, rw_uri):
940         self.tfs = tfs
941         self.ro_uri = ro_uri
942         self.rw_uri = rw_uri
943         assert (rw_uri or ro_uri)
944         self.children = {}
945         self.last_load = None
946         self.last_data = None
947         self.mtime = 0
948
949     def __repr__(self):
950         return "<Directory %s>" % (fingerprint(self.get_uri()),)
951
952     def maybe_refresh(self, name=None):
953         """
954         if the previously cached data was retrieved within the cache
955         validity period, does nothing. otherwise refetches the data
956         for this directory and reloads itself
957         """
958         now = time.time()
959         if self.last_load is None or (now - self.last_load) > self.tfs.cache_validity:
960             self.load(name)
961
962     def load(self, name=None):
963         now = time.time()
964         log('%s.loading(%s)' % (self, name))
965         url = self.tfs.compose_url("uri/%s?t=json", self.get_uri())
966         data = urllib.urlopen(url).read()
967         h = tagged_hash('cache_hash', data)
968         if h == self.last_data:
969             self.last_load = now
970             log('%s.load() : no change h(data)=%s' % (self, base32.b2a(h), ))
971             return
972         try:
973             parsed = simplejson.loads(data)
974         except ValueError:
975             log('%s.load(): unable to parse json data for dir:\n%r' % (self, data))
976             return
977         nodetype, d = parsed
978         assert nodetype == 'dirnode'
979         self.children.clear()
980         for cname,details in d['children'].items():
981             cname = unicode_to_utf8_or_str(cname)
982             ctype, cattrs = details
983             metadata = cattrs.get('metadata', {})
984             if ctype == 'dirnode':
985                 cobj = self.tfs.dir_for(cname, cattrs.get('ro_uri'), cattrs.get('rw_uri'))
986             else:
987                 assert ctype == "filenode"
988                 cobj = File(cattrs.get('size'), cattrs.get('ro_uri'))
989             self.children[cname] = cobj, metadata
990         self.last_load = now
991         self.last_data = h
992         self.mtime = now
993         log('%s.load() loaded: \n%s' % (self, self.pprint(),))
994
995     def get_children(self):
996         return self.children.keys()
997
998     def get_child(self, name):
999         return self.children[name][0]
1000
1001     def add_child(self, name, child, metadata):
1002         log('%s.add_child(%r, %r, %r)' % (self, name, child, metadata, ))
1003         self.children[name] = child, metadata
1004         url = self.tfs.compose_url("uri/%s/%s?t=uri", self.get_uri(), name)
1005         child_cap = do_http('PUT', url, child.get_uri())
1006         # XXX [ ] TODO: push metadata to tahoe node
1007         assert child_cap == child.get_uri()
1008         self.mtime = time.time()
1009         log('added child %r with %r to %r' % (name, child_cap, self))
1010
1011     def remove_child(self, name):
1012         log('%s.remove_child(%r)' % (self, name, ))
1013         del self.children[name]
1014         url = self.tfs.compose_url("uri/%s/%s", self.get_uri(), name)
1015         resp = do_http('DELETE', url)
1016         self.mtime = time.time()
1017         log('child (%s) removal yielded %r' % (name, resp,))
1018
1019     def get_uri(self):
1020         return self.rw_uri or self.ro_uri
1021
1022     # TODO: rename to 'is_writeable', or switch sense to 'is_readonly', for consistency with Tahoe code
1023     def writable(self):
1024         return self.rw_uri and self.rw_uri != self.ro_uri
1025
1026     def pprint(self, prefix='', printed=None, suffix=''):
1027         ret = []
1028         if printed is None:
1029             printed = set()
1030         writable = self.writable() and '+' or ' '
1031         if self in printed:
1032             ret.append("         %s/%s ... <%s> : %s" % (prefix, writable, fingerprint(self.get_uri()), suffix, ))
1033         else:
1034             ret.append("[%s] %s/%s : %s" % (fingerprint(self.get_uri()), prefix, writable, suffix, ))
1035             printed.add(self)
1036             for name,(child,metadata) in sorted(self.children.items()):
1037                 ret.append(child.pprint(' ' * (len(prefix)+1)+name, printed, repr(metadata)))
1038         return '\n'.join(ret)
1039
1040     def get_metadata(self, name):
1041         return self.children[name][1]
1042
1043     def get_stat(self, name):
1044         child,metadata = self.children[name]
1045         log("%s.get_stat(%s) md: %r" % (self, name, metadata))
1046
1047         if isinstance(child, Directory):
1048             child.maybe_refresh(name)
1049             mode = metadata.get('st_mode') or (stat.S_IFDIR | 0755)
1050             s = TStat(metadata, st_mode=mode, st_nlink=1, st_mtime=child.mtime)
1051         else:
1052             if hasattr(child, 'tmp_fname'):
1053                 s = os.stat(child.tmp_fname)
1054                 log("%s.get_stat(%s) returning local stat of tmp file" % (self, name, ))
1055             else:
1056                 s = TStat(metadata,
1057                           st_nlink = 1,
1058                           st_size = child.size,
1059                           st_mode = metadata.get('st_mode') or (stat.S_IFREG | 0444),
1060                           st_mtime = metadata.get('mtime') or self.mtime,
1061                           )
1062             return s
1063
1064         log("%s.get_stat(%s)->%s" % (self, name, s))
1065         return s
1066
1067 class File(object):
1068     def __init__(self, size, ro_uri):
1069         self.size = size
1070         if ro_uri:
1071             ro_uri = str(ro_uri)
1072         self.ro_uri = ro_uri
1073
1074     def __repr__(self):
1075         return "<File %s>" % (fingerprint(self.ro_uri) or [self.tmp_fname],)
1076
1077     def pprint(self, prefix='', printed=None, suffix=''):
1078         return "         %s (%s) : %s" % (prefix, self.size, suffix, )
1079
1080     def get_uri(self):
1081         return self.ro_uri
1082
1083     def writable(self):
1084         return True
1085
1086 class TFS(object):
1087     def __init__(self, nodedir, nodeurl, root_uri, 
1088                        cache_validity_period=DEFAULT_DIRECTORY_VALIDITY, async=False):
1089         self.cache_validity = cache_validity_period
1090         self.nodeurl = nodeurl
1091         self.root_uri = root_uri
1092         self.async = async
1093         self.dirs = {}
1094
1095         cachedir = os.path.expanduser(os.path.join(nodedir, '_cache'))
1096         self.cache = FileCache(nodeurl, cachedir)
1097         ro_uri = DirectoryURI.init_from_string(self.root_uri).get_readonly()
1098         self.root = Directory(self, ro_uri, self.root_uri)
1099         self.root.maybe_refresh('<root>')
1100
1101     def log(self, msg):
1102         log("<TFS> %s" % (msg, ))
1103
1104     def pprint(self):
1105         return self.root.pprint()
1106
1107     def compose_url(self, fmt, *args):
1108         return self.nodeurl + (fmt % tuple(map(urllib.quote, args)))
1109
1110     def get_parent_name_and_child(self, path):
1111         """
1112         find the parent dir node, name of child relative to that parent, and
1113         child node within the TFS object space.
1114         @returns: (parent, name, child) if the child is found
1115                   (parent, name, None) if the child is missing from the parent
1116                   (None, name, None) if the parent is not found
1117         """
1118         if path == '/':
1119             return 
1120         dirname, name = os.path.split(path)
1121         parent = self.get_path(dirname)
1122         if parent:
1123             try:
1124                 child = parent.get_child(name)
1125                 return parent, name, child
1126             except KeyError:
1127                 return parent, name, None
1128         else:
1129             return None, name, None
1130
1131     def get_path(self, path):
1132         comps = path.strip('/').split('/')
1133         if comps == ['']:
1134             comps = []
1135         cursor = self.root
1136         c_name = '<root>'
1137         for comp in comps:
1138             if not isinstance(cursor, Directory):
1139                 self.log('path "%s" is not a dir' % (path,))
1140                 return None
1141             cursor.maybe_refresh(c_name)
1142             try:
1143                 cursor = cursor.get_child(comp)
1144                 c_name = comp
1145             except KeyError:
1146                 self.log('path "%s" not found' % (path,))
1147                 return None
1148         if isinstance(cursor, Directory):
1149             cursor.maybe_refresh(c_name)
1150         return cursor
1151
1152     def dir_for(self, name, ro_uri, rw_uri):
1153         #self.log('dir_for(%s) [%s/%s]' % (name, fingerprint(ro_uri), fingerprint(rw_uri)))
1154         if ro_uri:
1155             ro_uri = str(ro_uri)
1156         if rw_uri:
1157             rw_uri = str(rw_uri)
1158         uri = rw_uri or ro_uri
1159         assert uri
1160         dirobj = self.dirs.get(uri)
1161         if not dirobj:
1162             self.log('dir_for(%s) creating new Directory' % (name, ))
1163             dirobj = Directory(self, ro_uri, rw_uri)
1164             self.dirs[uri] = dirobj
1165         return dirobj
1166
1167     def upload(self, fname):
1168         self.log('upload(%r)' % (fname,))
1169         fh = file(fname, 'rb')
1170         url = self.compose_url("uri")
1171         file_cap = do_http('PUT', url, fh)
1172         self.log('uploaded to: %r' % (file_cap,))
1173         return file_cap
1174
1175     def mkdir(self, path):
1176         self.log('mkdir(%r)' % (path,))
1177         parent, name, child = self.get_parent_name_and_child(path)
1178
1179         if child:
1180             raise EEXIST('File exists: %s' % (name,))
1181         if not parent:
1182             raise ENOENT('No such file or directory: %s' % (path,))
1183
1184         url = self.compose_url("uri?t=mkdir")
1185         new_dir_cap = do_http('PUT', url)
1186
1187         ro_uri = DirectoryURI.init_from_string(new_dir_cap).get_readonly()
1188         child = Directory(self, ro_uri, new_dir_cap)
1189         parent.add_child(name, child, {})
1190
1191     def rename(self, path, path1):
1192         self.log('rename(%s, %s)' % (path, path1))
1193         src_parent, src_name, src_child = self.get_parent_name_and_child(path)
1194         dst_parent, dst_name, dst_child = self.get_parent_name_and_child(path1)
1195
1196         if not src_child or not dst_parent:
1197             raise ENOENT('No such file or directory')
1198
1199         dst_parent.add_child(dst_name, src_child, {})
1200         src_parent.remove_child(src_name)
1201
1202     def unlink(self, path):
1203         parent, name, child = self.get_parent_name_and_child(path)
1204
1205         if child is None: # parent or child is missing
1206             raise ENOENT('No such file or directory')
1207         if not parent.writable():
1208             raise EACCESS('Permission denied')
1209
1210         parent.remove_child(name)
1211
1212     def link(self, path, path1):
1213         src = self.get_path(path)
1214         dst_parent, dst_name, dst_child = self.get_parent_name_and_child(path1)
1215
1216         if not src:
1217             raise ENOENT('No such file or directory')
1218         if dst_parent is None:
1219             raise ENOENT('No such file or directory')
1220         if not dst_parent.writable():
1221             raise EACCESS('Permission denied')
1222
1223         dst_parent.add_child(dst_name, src, {})
1224
1225 class FileCache(object):
1226     def __init__(self, nodeurl, cachedir):
1227         self.nodeurl = nodeurl
1228         self.cachedir = cachedir
1229         if not os.path.exists(self.cachedir):
1230             os.makedirs(self.cachedir)
1231         self.tmpdir = os.path.join(self.cachedir, 'tmp')
1232         if not os.path.exists(self.tmpdir):
1233             os.makedirs(self.tmpdir)
1234         self.downloaders = weakref.WeakValueDictionary()
1235
1236     def log(self, msg):
1237         log("<FC> %s" % (msg, ))
1238
1239     def get_file(self, uri):
1240         self.log('get_file(%s)' % (uri,))
1241         if is_literal_file_uri(uri):
1242             return self.get_literal(uri)
1243         else:
1244             return self.get_chk(uri, async=False)
1245
1246     def async_get_file(self, uri):
1247         self.log('get_file(%s)' % (uri,))
1248         return self.get_chk(uri, async=True)
1249
1250     def get_literal(self, uri):
1251         h = sha.new(uri).digest()
1252         u = LiteralFileURI.init_from_string(uri)
1253         fname = os.path.join(self.cachedir, '__'+base64.b32encode(h).lower())
1254         size = len(u.data)
1255         self.log('writing literal file %s (%s)' % (fname, size, ))
1256         fh = open(fname, 'wb')
1257         fh.write(u.data)
1258         fh.close()
1259         return fname
1260
1261     def get_chk(self, uri, async=False):
1262         u = CHKFileURI.init_from_string(str(uri))
1263         storage_index = u.storage_index
1264         size = u.size
1265         fname = os.path.join(self.cachedir, base64.b32encode(storage_index).lower())
1266         if os.path.exists(fname):
1267             fsize = os.path.getsize(fname)
1268             if fsize == size:
1269                 if async:
1270                     return fname, None
1271                 else:
1272                     return fname
1273             else:
1274                 self.log('warning file "%s" is too short %s < %s' % (fname, fsize, size))
1275         self.log('downloading file %s (%s)' % (fname, size, ))
1276         url = "%suri/%s" % (self.nodeurl, uri)
1277         if async:
1278             if fname in self.downloaders and self.downloaders[fname].running:
1279                 downloader = self.downloaders[fname]
1280             else:
1281                 downloader = DownloaderWithReadQueue()
1282                 self.downloaders[fname] = downloader
1283                 d = downloader.start(url, fname, target_size=u.size)
1284                 def clear_downloader(result, fname):
1285                     self.log('clearing %s from downloaders: %r' % (fname, result))
1286                     self.downloaders.pop(fname, None)
1287                 d.addBoth(clear_downloader, fname)
1288             return fname, downloader
1289         else:
1290             fh = open(fname, 'wb')
1291             download = urllib.urlopen(url)
1292             while True:
1293                 chunk = download.read(4096)
1294                 if not chunk:
1295                     break
1296                 fh.write(chunk)
1297             fh.close()
1298             return fname
1299
1300     def tmp_file(self, id):
1301         fname = os.path.join(self.tmpdir, base64.b32encode(id).lower())
1302         return fname
1303
1304 _tfs = None # to appease pyflakes; is set in main()
1305 def print_tree():
1306     log('tree:\n' + _tfs.pprint())
1307
1308
1309 def unmarshal(obj):
1310     if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
1311         return obj
1312     elif isinstance(obj, unicode) or isinstance(obj, str):
1313         #log('unmarshal(%r)' % (obj,))
1314         return base64.b64decode(obj)
1315     elif isinstance(obj, list):
1316         return map(unmarshal, obj)
1317     elif isinstance(obj, dict):
1318         return dict([ (k,unmarshal(v)) for k,v in obj.items() ])
1319     else:
1320         raise ValueError('object type not int,str,list,dict,none (%s) (%r)' % (type(obj), obj))
1321
1322 def marshal(obj):
1323     if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
1324         return obj
1325     elif isinstance(obj, str):
1326         return base64.b64encode(obj)
1327     elif isinstance(obj, list) or isinstance(obj, tuple):
1328         return map(marshal, obj)
1329     elif isinstance(obj, dict):
1330         return dict([ (k,marshal(v)) for k,v in obj.items() ])
1331     else:
1332         raise ValueError('object type not int,str,list,dict,none (%s)' % type(obj))
1333
1334
1335 class TRPCProtocol(Protocol):
1336     compute_response_sha1 = True
1337     log_all_requests = False
1338
1339     def connectionMade(self):
1340         self.buf = []
1341
1342     def dataReceived(self, data):
1343         if data == 'keepalive\n':
1344             log('keepalive connection on %r' % (self.transport,))
1345             self.keepalive = True
1346             return
1347
1348         if not data.endswith('\n'):
1349             self.buf.append(data)
1350             return
1351         if self.buf:
1352             self.buf.append(data)
1353             reqstr = ''.join(self.buf)
1354             self.buf = []
1355             self.dispatch_request(reqstr)
1356         else:
1357             self.dispatch_request(data)
1358
1359     def dispatch_request(self, reqstr):
1360         try:
1361             req = simplejson.loads(reqstr)
1362         except ValueError, ve:
1363             log(ve)
1364             return
1365
1366         d = defer.maybeDeferred(self.handle_request, req)
1367         d.addCallback(self.send_response)
1368         d.addErrback(self.send_error)
1369
1370     def send_error(self, failure):
1371         log('failure: %s' % (failure,))
1372         if failure.check(TFSIOError):
1373             e = failure.value
1374             self.send_response(['error', 'errno', e.args[0], e.args[1]])
1375         else:
1376             self.send_response(['error', 'failure', str(failure)])
1377
1378     def send_response(self, result):
1379         response = simplejson.dumps(result)
1380         header = { 'len': len(response), }
1381         if self.compute_response_sha1:
1382             header['sha1'] = base64.b64encode(sha.new(response).digest())
1383         hdr = simplejson.dumps(header)
1384         self.transport.write(hdr)
1385         self.transport.write('\n')
1386         self.transport.write(response)
1387         self.transport.loseConnection()
1388
1389     def connectionLost(self, reason):
1390         if hasattr(self, 'keepalive'):
1391             log('keepalive connection %r lost, shutting down' % (self.transport,))
1392             reactor.callLater(0, reactor.stop)
1393
1394     def handle_request(self, req):
1395         if type(req) is not list or not req or len(req) < 1:
1396             return ['error', 'malformed request']
1397         if req[0] == 'call':
1398             if len(req) < 3:
1399                 return ['error', 'malformed request']
1400             methname = req[1]
1401             try:
1402                 args = unmarshal(req[2])
1403             except ValueError, ve:
1404                 return ['error', 'malformed arguments', str(ve)]
1405
1406             try:
1407                 meth = getattr(self.factory.server, methname)
1408             except AttributeError, ae:
1409                 return ['error', 'no such method', str(ae)]
1410
1411             if self.log_all_requests:
1412                 log('call %s(%s)' % (methname, ', '.join(map(repr, args))))
1413             try:
1414                 result = meth(*args)
1415             except TFSIOError, e:
1416                 log('errno: %s; %s' % e.args)
1417                 return ['error', 'errno', e.args[0], e.args[1]]
1418             except Exception, e:
1419                 log('exception: ' + traceback.format_exc())
1420                 return ['error', 'exception', str(e)]
1421             d = defer.succeed(None)
1422             d.addCallback(lambda junk: result) # result may be Deferred
1423             d.addCallback(lambda res: ['result', marshal(res)]) # only applies if not errback
1424             return d
1425
1426 class TFSServer(object):
1427     def __init__(self, socket_path, server=None):
1428         self.socket_path = socket_path
1429         log('TFSServer init socket: %s' % (socket_path,))
1430
1431         self.factory = Factory()
1432         self.factory.protocol = TRPCProtocol
1433         if server:
1434             self.factory.server = server
1435         else:
1436             self.factory.server = self
1437
1438     def get_service(self):
1439         if not hasattr(self, 'svc'):
1440             from twisted.application import strports
1441             self.svc = strports.service('unix:'+self.socket_path, self.factory)
1442         return self.svc
1443
1444     def run(self):
1445         svc = self.get_service()
1446         def ss():
1447             try:
1448                 svc.startService()
1449             except:
1450                 reactor.callLater(0, reactor.stop)
1451                 raise
1452         reactor.callLater(0, ss)
1453         reactor.run()
1454
1455     def hello(self):
1456         return 'pleased to meet you'
1457
1458     def echo(self, arg):
1459         return arg
1460
1461     def failex(self):
1462         raise ValueError('expected')
1463
1464     def fail(self):
1465         return defer.maybeDeferred(self.failex)
1466
1467 class RPCError(RuntimeError):
1468     pass
1469
1470 class TRPC(object):
1471     def __init__(self, socket_fname):
1472         self.socket_fname = socket_fname
1473         self.keepalive = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1474         self.keepalive.connect(self.socket_fname)
1475         self.keepalive.send('keepalive\n')
1476         log('requested keepalive on %s' % (self.keepalive,))
1477
1478     def req(self, req):
1479         # open conenction to trpc server
1480         s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1481         s.connect(self.socket_fname)
1482         # send request
1483         s.send(simplejson.dumps(req))
1484         s.send('\n')
1485         # read response header
1486         hdr_data = s.recv(8192)
1487         first_newline = hdr_data.index('\n')
1488         header = hdr_data[:first_newline]
1489         data = hdr_data[first_newline+1:]
1490         hdr = simplejson.loads(header)
1491         hdr_len = hdr['len']
1492         if hdr.has_key('sha1'):
1493             hdr_sha1 = base64.b64decode(hdr['sha1'])
1494             spool = [data]
1495             spool_sha = sha.new(data)
1496             # spool response
1497             while True:
1498                 data = s.recv(8192)
1499                 if data:
1500                     spool.append(data)
1501                     spool_sha.update(data)
1502                 else:
1503                     break
1504         else:
1505             spool = [data]
1506             # spool response
1507             while True:
1508                 data = s.recv(8192)
1509                 if data:
1510                     spool.append(data)
1511                 else:
1512                     break
1513         s.close()
1514         # decode response
1515         resp = ''.join(spool)
1516         spool = None
1517         assert hdr_len == len(resp), str((hdr_len, len(resp), repr(resp)))
1518         if hdr.has_key('sha1'):
1519             data_sha1 = spool_sha.digest()
1520             spool = spool_sha = None
1521             assert hdr_sha1 == data_sha1, str((base32.b2a(hdr_sha1), base32.b2a(data_sha1)))
1522         #else:
1523             #print 'warning, server provided no sha1 to check'
1524         return resp
1525
1526     def call(self, methodname, *args):
1527         res = self.req(['call', methodname, marshal(args)])
1528
1529         result = simplejson.loads(res)
1530         if not result or len(result) < 2:
1531             raise TypeError('malformed response %r' % (result,))
1532         if result[0] == 'error':
1533             if result[1] == 'errno':
1534                 raise TFSIOError(result[2], result[3])
1535             else:
1536                 raise RPCError(*(result[1:])) # error, exception / error, failure
1537         elif result[0] == 'result':
1538             return unmarshal(result[1])
1539         else:
1540             raise TypeError('unknown response type %r' % (result[0],))
1541
1542     def shutdown(self):
1543         log('shutdown() closing keepalive %s' % (self.keepalive,))
1544         self.keepalive.close()
1545
1546 # (cut-n-pasted here due to an ImportError / some py2app linkage issues)
1547 #from twisted.scripts._twistd_unix import daemonize
1548 def daemonize():
1549     # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
1550     if os.fork():   # launch child and...
1551         os._exit(0) # kill off parent
1552     os.setsid()
1553     if os.fork():   # launch child and...
1554         os._exit(0) # kill off parent again.
1555     os.umask(077)
1556     null=os.open('/dev/null', os.O_RDWR)
1557     for i in range(3):
1558         try:
1559             os.dup2(null, i)
1560         except OSError, e:
1561             if e.errno != errno.EBADF:
1562                 raise
1563     os.close(null)
1564
1565 def main(argv):
1566     log("main(%s)" % (argv,))
1567
1568     # check for version or help options (no args == help)
1569     if not argv:
1570         argv = ['--help']
1571     if len(argv) == 1 and argv[0] in ['-h', '--help']:
1572         config = TahoeFuseOptions()
1573         print >> sys.stderr, config
1574         print >> sys.stderr, 'fuse usage follows:'
1575     if len(argv) == 1 and argv[0] in ['-h', '--help', '--version']:
1576         launch_tahoe_fuse(TahoeFuseLocal, None, argv)
1577         return -2
1578
1579     # parse command line options
1580     config = TahoeFuseOptions()
1581     try:
1582         #print 'parsing', argv
1583         config.parseOptions(argv)
1584     except usage.error, e:
1585         print config
1586         print e
1587         return -1
1588
1589     # check for which alias or uri is specified
1590     if config['alias']:
1591         alias = config['alias']
1592         #print 'looking for aliases in', config['node-directory']
1593         aliases = get_aliases(os.path.expanduser(config['node-directory']))
1594         if alias not in aliases:
1595             raise usage.error('Alias %r not found' % (alias,))
1596         root_uri = aliases[alias]
1597         root_name = alias
1598     elif config['root-uri']:
1599         root_uri = config['root-uri']
1600         root_name = 'uri_' + base32.b2a(tagged_hash('root_name', root_uri))[:12]
1601         # test the uri for structural validity:
1602         try:
1603             DirectoryURI.init_from_string(root_uri)
1604         except:
1605             raise usage.error('root-uri must be a valid directory uri (not %r)' % (root_uri,))
1606     else:
1607         raise usage.error('At least one of --alias or --root-uri must be specified')
1608
1609     nodedir = config['node-directory']
1610     nodeurl = config['node-url']
1611     if not nodeurl:
1612         nodeurl = getnodeurl(nodedir)
1613
1614     # allocate socket
1615     socket_dir = os.path.join(os.path.expanduser(nodedir), "tfuse.sockets")
1616     socket_path = os.path.join(socket_dir, root_name)
1617     if len(socket_path) > 103:
1618         # try googling AF_UNIX and sun_len for some taste of why this oddity exists.
1619         raise OSError(errno.ENAMETOOLONG, 'socket path too long (%s)' % (socket_path,))
1620
1621     fileutil.make_dirs(socket_dir, 0700)
1622     if os.path.exists(socket_path):
1623         log('socket exists')
1624         if config['server-shutdown']:
1625             log('calling shutdown')
1626             trpc = TRPC(socket_path)
1627             result = trpc.shutdown()
1628             log('result: %r' % (result,))
1629             log('called shutdown')
1630             return
1631         else:
1632             raise OSError(errno.EEXIST, 'fuse already running (%r exists)' % (socket_path,))
1633     elif config['server-shutdown']:
1634         raise OSError(errno.ENOTCONN, '--server-shutdown specified, but server not running')
1635
1636     if not os.path.exists(config.mountpoint):
1637         raise OSError(errno.ENOENT, 'No such file or directory: "%s"' % (config.mountpoint,))
1638
1639     global _tfs
1640     #
1641     # Standalone ("no-split")
1642     #
1643     if config['no-split']:
1644         reopen_logfile('tfuse.%s.unsplit.log' % (root_name,))
1645         log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n')
1646
1647         cache_timeout = float(config['cache-timeout'])
1648         tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=False)
1649         #print tfs.pprint()
1650
1651         # make tfs instance accesible to print_tree() for dbg
1652         _tfs = tfs
1653
1654         args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
1655         launch_tahoe_fuse(TahoeFuseLocal, tfs, args)
1656
1657     #
1658     # Server
1659     #
1660     elif config['server']:
1661         reopen_logfile('tfuse.%s.server.log' % (root_name,))
1662         log('\n'+(24*'_')+'init (server)'+(24*'_')+'\n')
1663
1664         log('daemonizing')
1665         daemonize()
1666
1667         try:
1668             cache_timeout = float(config['cache-timeout'])
1669             tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=True)
1670             #print tfs.pprint()
1671
1672             # make tfs instance accesible to print_tree() for dbg
1673             _tfs = tfs
1674
1675             log('launching tfs server')
1676             tfuse = TahoeFuseBase(tfs)
1677             tfs_server = TFSServer(socket_path, tfuse)
1678             tfs_server.run()
1679             log('tfs server ran, exiting')
1680         except:
1681             log('exception: ' + traceback.format_exc())
1682
1683     #
1684     # Client
1685     #
1686     else:
1687         reopen_logfile('tfuse.%s.client.log' % (root_name,))
1688         log('\n'+(24*'_')+'init (client)'+(24*'_')+'\n')
1689
1690         server_args = [sys.executable, sys.argv[0], '--server'] + argv
1691         if 'Allmydata.app/Contents/MacOS' in sys.executable:
1692             # in this case blackmatch is the 'fuse' subcommand of the 'tahoe' executable
1693             # otherwise we assume blackmatch is being run from source
1694             server_args.insert(2, 'fuse')
1695         #print 'launching server:', server_args
1696         server = subprocess.Popen(server_args)
1697         waiting_since = time.time()
1698         wait_at_most = 8
1699         while not os.path.exists(socket_path):
1700             log('waiting for appearance of %r' % (socket_path,))
1701             time.sleep(1)
1702             if time.time() - waiting_since > wait_at_most:
1703                 log('%r did not appear within %ss' % (socket_path, wait_at_most))
1704                 raise IOError(2, 'no socket %s' % (socket_path,))
1705         #print 'launched server'
1706         trpc = TRPC(socket_path)
1707
1708
1709         args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
1710         launch_tahoe_fuse(TahoeFuseShim, trpc, args)
1711
1712         
1713 if __name__ == '__main__':
1714     sys.exit(main(sys.argv[1:]))