1 import os, stat, py, select
3 from objectfs import ObjectFs
9 def remote_runner(BLOCKSIZE):
10 import sys, select, os, struct
13 while stream is not None:
14 iwtd, owtd, ewtd = select.select([0], [1], [])
18 data = stream.read(BLOCKSIZE)
19 res = ('R', path, pos, len(data))
20 sys.stdout.write('%r\n%s' % (res, data))
21 if len(data) < BLOCKSIZE:
25 msg = eval(sys.stdin.readline())
28 names = os.listdir(path)
32 st = os.stat(os.path.join(path, name))
35 res.append((name, st.st_mode, st.st_size))
37 sys.stdout.write('%s\n' % (res,))
42 data = f.read(BLOCKSIZE)
43 res = msg + (len(data),)
44 sys.stdout.write('%r\n%s' % (res, data))
47 stream = open(path, 'rb')
53 class CacheFs(ObjectFs):
54 MOUNT_OPTIONS = {'max_read': BLOCKSIZE}
56 def __init__(self, localdir, remotehost, remotedir):
57 src = inspect.getsource(remote_runner)
58 src += '\n\nremote_runner(%d)\n' % BLOCKSIZE
60 remotecmd = 'python -u -c "exec input()"'
61 cmdline = [remotehost, remotecmd]
62 # XXX Unix style quoting
63 for i in range(len(cmdline)):
64 cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'"
66 cmdline.insert(0, cmd)
68 child_in, child_out = os.popen2(' '.join(cmdline), bufsize=0)
69 child_in.write('%r\n' % (src,))
71 control = Controller(child_in, child_out)
72 ObjectFs.__init__(self, CacheDir(localdir, remotedir, control))
76 def __init__(self, child_in, child_out):
77 self.child_in = child_in
78 self.child_out = child_out
82 def next_answer(self):
83 answer = eval(self.child_out.readline())
86 remotefn, pos, length = answer[1:]
87 data = self.child_out.read(length)
88 self.cache[remotefn, pos] = data
91 def wait_answer(self, query):
94 self.child_in.write('%r\n' % (query,))
96 answer = self.next_answer()
97 if answer[:len(query)] == query:
98 return answer[len(query):]
100 def listdir(self, remotedir):
101 query = ('L', remotedir)
102 res, = self.wait_answer(query)
105 def wait_for_block(self, remotefn, pos):
107 while key not in self.cache:
109 return self.cache[key]
111 def peek_for_block(self, remotefn, pos):
113 while key not in self.cache:
114 iwtd, owtd, ewtd = select.select([self.child_out], [], [], 0)
118 return self.cache[key]
120 def cached_block(self, remotefn, pos):
122 return self.cache.get(key)
124 def start_streaming(self, remotefn, pos):
125 if remotefn != self.streaming:
126 while (remotefn, pos) in self.cache:
128 query = ('S', remotefn, pos)
130 self.child_in.write('%r\n' % (query,))
131 self.streaming = remotefn
133 def read_blocks(self, remotefn, poslist):
134 lst = ['%r\n' % (('R', remotefn, pos),)
135 for pos in poslist if (remotefn, pos) not in self.cache]
137 self.streaming = None
138 #print 'Q', '+ '.join(lst)
139 self.child_in.write(''.join(lst))
141 def clear_cache(self, remotefn):
142 for key in self.cache.keys():
143 if key[0] == remotefn:
148 def __init__(self, localdir, remotedir, control, size=0):
149 self.localdir = localdir
150 self.remotedir = remotedir
151 self.control = control
154 if self.entries is None:
156 for name, st_mode, st_size in self.control.listdir(self.remotedir):
157 if stat.S_ISDIR(st_mode):
161 obj = cls(os.path.join(self.localdir, name),
162 os.path.join(self.remotedir, name),
165 self.entries.append((name, obj))
169 def __init__(self, localfn, remotefn, control, size):
170 self.localfn = localfn
171 self.remotefn = remotefn
172 self.control = control
180 st = os.stat(self.localfn)
184 if st.st_size == self.st_size: # fully cached
185 return open(self.localfn, 'rb')
186 os.unlink(self.localfn)
187 lpath = py.path.local(self.partial())
189 f = open(self.partial(), 'r+b')
190 return DumpFile(self, f)
193 return self.localfn + '.partial~'
197 os.rename(self.partial(), self.localfn)
204 def __init__(self, cf, f):
209 def seek(self, npos):
212 def read(self, count):
213 control = self.cf.control
214 self.f.seek(self.pos)
215 buffer = self.f.read(count)
216 self.pos += len(buffer)
220 curend = self.f.tell()
224 while self.pos > curend:
226 data = control.peek_for_block(self.cf.remotefn, curend)
232 if len(data) < BLOCKSIZE:
235 start = max(self.pos, curend) & (-BLOCKSIZE)
236 end = (self.pos + count + BLOCKSIZE-1) & (-BLOCKSIZE)
237 poslist = range(start, end, BLOCKSIZE)
239 if self.pos <= curend:
240 control.start_streaming(self.cf.remotefn, start)
243 data = control.wait_for_block(self.cf.remotefn, p)
244 assert self.f.tell() == p
246 if len(data) < BLOCKSIZE:
249 curend = self.f.tell()
250 while curend < self.cf.st_size:
252 data = control.cached_block(self.cf.remotefn, curend)
255 assert self.f.tell() == curend
260 control.clear_cache(self.cf.remotefn)
262 self.f.seek(self.pos)
263 buffer += self.f.read(count)
266 control.read_blocks(self.cf.remotefn, poslist)
269 data = control.wait_for_block(self.cf.remotefn, p)
271 if len(data) < BLOCKSIZE:
273 data = ''.join(result)
274 buffer += data[self.pos-start:self.pos-start+count]
277 if self.pos + 60000 > curend:
279 control.start_streaming(self.cf.remotefn, curend)