]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - contrib/fuse/impl_b/pyfuse/greenhandler.py
add Protovis.js-based download-status timeline visualization
[tahoe-lafs/tahoe-lafs.git] / contrib / fuse / impl_b / pyfuse / greenhandler.py
1 import sys, os, Queue, atexit
2
3 dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
4 dir = os.path.join(dir, 'pypeers')
5 if dir not in sys.path:
6     sys.path.append(dir)
7 del dir
8
9 from greensock import *
10 import threadchannel
11
12
13 def _read_from_kernel(handler):
14     while True:
15         msg = read(handler.fd, handler.MAX_READ)
16         if not msg:
17             print >> sys.stderr, "out-kernel connexion closed"
18             break
19         autogreenlet(handler.handle_message, msg)
20
21 def add_handler(handler):
22     autogreenlet(_read_from_kernel, handler)
23     atexit.register(handler.close)
24
25 # ____________________________________________________________
26
27 THREAD_QUEUE = None
28
29 def thread_runner(n):
30     while True:
31         #print 'thread runner %d waiting' % n
32         operation, answer = THREAD_QUEUE.get()
33         #print 'thread_runner %d: %r' % (n, operation)
34         try:
35             res = True, operation()
36         except Exception:
37             res = False, sys.exc_info()
38         #print 'thread_runner %d: got %d bytes' % (n, len(res or ''))
39         answer.send(res)
40
41
42 def start_bkgnd_thread():
43     global THREAD_QUEUE, THREAD_LOCK
44     import thread
45     threadchannel.startup()
46     THREAD_LOCK = thread.allocate_lock()
47     THREAD_QUEUE = Queue.Queue()
48     for i in range(4):
49         thread.start_new_thread(thread_runner, (i,))
50
51 def wget(*args, **kwds):
52     from wget import wget
53
54     def operation():
55         kwds['unlock'] = THREAD_LOCK
56         THREAD_LOCK.acquire()
57         try:
58             return wget(*args, **kwds)
59         finally:
60             THREAD_LOCK.release()
61
62     if THREAD_QUEUE is None:
63         start_bkgnd_thread()
64     answer = threadchannel.ThreadChannel()
65     THREAD_QUEUE.put((operation, answer))
66     ok, res = answer.receive()
67     if not ok:
68         typ, value, tb = res
69         raise typ, value, tb
70     #print 'wget returns %d bytes' % (len(res or ''),)
71     return res