]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - contrib/fuse/impl_b/pyfuse/cachefs.py
add Protovis.js-based download-status timeline visualization
[tahoe-lafs/tahoe-lafs.git] / contrib / fuse / impl_b / pyfuse / cachefs.py
1 import os, stat, py, select
2 import inspect
3 from objectfs import ObjectFs
4
5
6 BLOCKSIZE = 8192
7
8
9 def remote_runner(BLOCKSIZE):
10     import sys, select, os, struct
11     stream = None
12     while True:
13         while stream is not None:
14             iwtd, owtd, ewtd = select.select([0], [1], [])
15             if iwtd:
16                 break
17             pos = stream.tell()
18             data = stream.read(BLOCKSIZE)
19             res = ('R', path, pos, len(data))
20             sys.stdout.write('%r\n%s' % (res, data))
21             if len(data) < BLOCKSIZE:
22                 stream = None
23
24         stream = None
25         msg = eval(sys.stdin.readline())
26         if msg[0] == 'L':
27             path = msg[1]
28             names = os.listdir(path)
29             res = []
30             for name in names:
31                 try:
32                     st = os.stat(os.path.join(path, name))
33                 except OSError:
34                     continue
35                 res.append((name, st.st_mode, st.st_size))
36             res = msg + (res,)
37             sys.stdout.write('%s\n' % (res,))
38         elif msg[0] == 'R':
39             path, pos = msg[1:]
40             f = open(path, 'rb')
41             f.seek(pos)
42             data = f.read(BLOCKSIZE)
43             res = msg + (len(data),)
44             sys.stdout.write('%r\n%s' % (res, data))
45         elif msg[0] == 'S':
46             path, pos = msg[1:]
47             stream = open(path, 'rb')
48             stream.seek(pos)
49         #elif msg[0] == 'C':
50         #    stream = None
51
52
53 class CacheFs(ObjectFs):
54     MOUNT_OPTIONS = {'max_read': BLOCKSIZE}
55
56     def __init__(self, localdir, remotehost, remotedir):
57         src = inspect.getsource(remote_runner)
58         src += '\n\nremote_runner(%d)\n' % BLOCKSIZE
59
60         remotecmd = 'python -u -c "exec input()"'
61         cmdline = [remotehost, remotecmd]
62         # XXX Unix style quoting
63         for i in range(len(cmdline)):
64             cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'"
65         cmd = 'ssh -C'
66         cmdline.insert(0, cmd)
67
68         child_in, child_out = os.popen2(' '.join(cmdline), bufsize=0)
69         child_in.write('%r\n' % (src,))
70
71         control = Controller(child_in, child_out)
72         ObjectFs.__init__(self, CacheDir(localdir, remotedir, control))
73
74
75 class Controller:
76     def __init__(self, child_in, child_out):
77         self.child_in = child_in
78         self.child_out = child_out
79         self.cache = {}
80         self.streaming = None
81
82     def next_answer(self):
83         answer = eval(self.child_out.readline())
84         #print 'A', answer
85         if answer[0] == 'R':
86             remotefn, pos, length = answer[1:]
87             data = self.child_out.read(length)
88             self.cache[remotefn, pos] = data
89         return answer
90
91     def wait_answer(self, query):
92         self.streaming = None
93         #print 'Q', query
94         self.child_in.write('%r\n' % (query,))
95         while True:
96             answer = self.next_answer()
97             if answer[:len(query)] == query:
98                 return answer[len(query):]
99
100     def listdir(self, remotedir):
101         query = ('L', remotedir)
102         res, = self.wait_answer(query)
103         return res
104
105     def wait_for_block(self, remotefn, pos):
106         key = remotefn, pos
107         while key not in self.cache:
108             self.next_answer()
109         return self.cache[key]
110
111     def peek_for_block(self, remotefn, pos):
112         key = remotefn, pos
113         while key not in self.cache:
114             iwtd, owtd, ewtd = select.select([self.child_out], [], [], 0)
115             if not iwtd:
116                 return None
117             self.next_answer()
118         return self.cache[key]
119
120     def cached_block(self, remotefn, pos):
121         key = remotefn, pos
122         return self.cache.get(key)
123
124     def start_streaming(self, remotefn, pos):
125         if remotefn != self.streaming:
126             while (remotefn, pos) in self.cache:
127                 pos += BLOCKSIZE
128             query = ('S', remotefn, pos)
129             #print 'Q', query
130             self.child_in.write('%r\n' % (query,))
131             self.streaming = remotefn
132
133     def read_blocks(self, remotefn, poslist):
134         lst = ['%r\n' % (('R', remotefn, pos),)
135                for pos in poslist if (remotefn, pos) not in self.cache]
136         if lst:
137             self.streaming = None
138             #print 'Q', '+ '.join(lst)
139             self.child_in.write(''.join(lst))
140
141     def clear_cache(self, remotefn):
142         for key in self.cache.keys():
143             if key[0] == remotefn:
144                 del self.cache[key]
145
146
147 class CacheDir:
148     def __init__(self, localdir, remotedir, control, size=0):
149         self.localdir  = localdir
150         self.remotedir = remotedir
151         self.control   = control
152         self.entries   = None
153     def listdir(self):
154         if self.entries is None:
155             self.entries = []
156             for name, st_mode, st_size in self.control.listdir(self.remotedir):
157                 if stat.S_ISDIR(st_mode):
158                     cls = CacheDir
159                 else:
160                     cls = CacheFile
161                 obj = cls(os.path.join(self.localdir, name),
162                           os.path.join(self.remotedir, name),
163                           self.control,
164                           st_size)
165                 self.entries.append((name, obj))
166         return self.entries
167
168 class CacheFile:
169     def __init__(self, localfn, remotefn, control, size):
170         self.localfn  = localfn
171         self.remotefn = remotefn
172         self.control  = control
173         self.st_size  = size
174
175     def size(self):
176         return self.st_size
177
178     def read(self):
179         try:
180             st = os.stat(self.localfn)
181         except OSError:
182             pass
183         else:
184             if st.st_size == self.st_size:     # fully cached
185                 return open(self.localfn, 'rb')
186             os.unlink(self.localfn)
187         lpath = py.path.local(self.partial())
188         lpath.ensure(file=1)
189         f = open(self.partial(), 'r+b')
190         return DumpFile(self, f)
191
192     def partial(self):
193         return self.localfn + '.partial~'
194
195     def complete(self):
196         try:
197             os.rename(self.partial(), self.localfn)
198         except OSError:
199             pass
200
201
202 class DumpFile:
203
204     def __init__(self, cf, f):
205         self.cf = cf
206         self.f = f
207         self.pos = 0
208
209     def seek(self, npos):
210         self.pos = npos
211
212     def read(self, count):
213         control = self.cf.control
214         self.f.seek(self.pos)
215         buffer = self.f.read(count)
216         self.pos += len(buffer)
217         count -= len(buffer)
218
219         self.f.seek(0, 2)
220         curend = self.f.tell()
221
222         if count > 0:
223
224             while self.pos > curend:
225                 curend &= -BLOCKSIZE
226                 data = control.peek_for_block(self.cf.remotefn, curend)
227                 if data is None:
228                     break
229                 self.f.seek(curend)
230                 self.f.write(data)
231                 curend += len(data)
232                 if len(data) < BLOCKSIZE:
233                     break
234
235             start = max(self.pos, curend) & (-BLOCKSIZE)
236             end = (self.pos + count + BLOCKSIZE-1) & (-BLOCKSIZE)
237             poslist = range(start, end, BLOCKSIZE)
238
239             if self.pos <= curend:
240                 control.start_streaming(self.cf.remotefn, start)
241                 self.f.seek(start)
242                 for p in poslist:
243                     data = control.wait_for_block(self.cf.remotefn, p)
244                     assert self.f.tell() == p
245                     self.f.write(data)
246                     if len(data) < BLOCKSIZE:
247                         break
248
249                 curend = self.f.tell()
250                 while curend < self.cf.st_size:
251                     curend &= -BLOCKSIZE
252                     data = control.cached_block(self.cf.remotefn, curend)
253                     if data is None:
254                         break
255                     assert self.f.tell() == curend
256                     self.f.write(data)
257                     curend += len(data)
258                 else:
259                     self.cf.complete()
260                     control.clear_cache(self.cf.remotefn)
261
262                 self.f.seek(self.pos)
263                 buffer += self.f.read(count)
264
265             else:
266                 control.read_blocks(self.cf.remotefn, poslist)
267                 result = []
268                 for p in poslist:
269                     data = control.wait_for_block(self.cf.remotefn, p)
270                     result.append(data)
271                     if len(data) < BLOCKSIZE:
272                         break
273                 data = ''.join(result)
274                 buffer += data[self.pos-start:self.pos-start+count]
275
276         else:
277             if self.pos + 60000 > curend:
278                 curend &= -BLOCKSIZE
279                 control.start_streaming(self.cf.remotefn, curend)
280
281         return buffer