]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
0ac2cf7ef21e699e4ff4d18e22b8793bbeed6533
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1 import copy, os.path, stat
2 from cStringIO import StringIO
3 from zope.interface import implements
4 from twisted.internet import defer
5 from twisted.internet.interfaces import IPushProducer
6 from twisted.protocols import basic
7 from foolscap.api import eventually
8 from allmydata.interfaces import IImmutableFileNode, ICheckable, \
9      IDownloadTarget, IUploadResults
10 from allmydata.util import dictutil, log, base32
11 from allmydata.uri import CHKFileURI, LiteralFileURI
12 from allmydata.immutable.checker import Checker
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.immutable.repairer import Repairer
15 from allmydata.immutable import download
16
17 class _ImmutableFileNodeBase(object):
18     implements(IImmutableFileNode, ICheckable)
19
20     def get_write_uri(self):
21         return None
22
23     def get_readonly_uri(self):
24         return self.get_uri()
25
26     def is_mutable(self):
27         return False
28
29     def is_readonly(self):
30         return True
31
32     def is_unknown(self):
33         return False
34
35     def is_allowed_in_immutable_directory(self):
36         return True
37
38     def raise_error(self):
39         pass
40
41     def __hash__(self):
42         return self.u.__hash__()
43     def __eq__(self, other):
44         if isinstance(other, _ImmutableFileNodeBase):
45             return self.u.__eq__(other.u)
46         else:
47             return False
48     def __ne__(self, other):
49         if isinstance(other, _ImmutableFileNodeBase):
50             return self.u.__eq__(other.u)
51         else:
52             return True
53
54 class PortionOfFile:
55     # like a list slice (things[2:14]), but for a file on disk
56     def __init__(self, fn, offset=0, size=None):
57         self.f = open(fn, "rb")
58         self.f.seek(offset)
59         self.bytes_left = size
60
61     def read(self, size=None):
62         # bytes_to_read = min(size, self.bytes_left), but None>anything
63         if size is None:
64             bytes_to_read = self.bytes_left
65         elif self.bytes_left is None:
66             bytes_to_read = size
67         else:
68             bytes_to_read = min(size, self.bytes_left)
69         data = self.f.read(bytes_to_read)
70         if self.bytes_left is not None:
71             self.bytes_left -= len(data)
72         return data
73
74 class DownloadCache:
75     implements(IDownloadTarget)
76
77     def __init__(self, filecap, storage_index, downloader,
78                  cachedirectorymanager):
79         self._downloader = downloader
80         self._uri = filecap
81         self._storage_index = storage_index
82         self.milestones = set() # of (offset,size,Deferred)
83         self.cachedirectorymanager = cachedirectorymanager
84         self.cachefile = None
85         self.download_in_progress = False
86         # five states:
87         #  new ImmutableFileNode, no downloads ever performed
88         #  new ImmutableFileNode, leftover file (partial)
89         #  new ImmutableFileNode, leftover file (whole)
90         #  download in progress, not yet complete
91         #  download complete
92
93     def when_range_available(self, offset, size):
94         assert isinstance(offset, (int,long))
95         assert isinstance(size, (int,long))
96
97         d = defer.Deferred()
98         self.milestones.add( (offset,size,d) )
99         self._check_milestones()
100         if self.milestones and not self.download_in_progress:
101             self.download_in_progress = True
102             log.msg(format=("immutable filenode read [%(si)s]: " +
103                             "starting download"),
104                     si=base32.b2a(self._storage_index),
105                     umid="h26Heg", level=log.OPERATIONAL)
106             d2 = self._downloader.download(self._uri, self)
107             d2.addBoth(self._download_done)
108             d2.addErrback(self._download_failed)
109             d2.addErrback(log.err, umid="cQaM9g")
110         return d
111
112     def read(self, consumer, offset, size):
113         assert offset+size <= self.get_filesize()
114         if not self.cachefile:
115             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
116         f = PortionOfFile(self.cachefile.get_filename(), offset, size)
117         d = basic.FileSender().beginFileTransfer(f, consumer)
118         d.addCallback(lambda lastSent: consumer)
119         return d
120
121     def _download_done(self, res):
122         # clear download_in_progress, so failed downloads can be re-tried
123         self.download_in_progress = False
124         return res
125
126     def _download_failed(self, f):
127         # tell anyone who's waiting that we failed
128         for m in self.milestones:
129             (offset,size,d) = m
130             eventually(d.errback, f)
131         self.milestones.clear()
132
133     def _check_milestones(self):
134         current_size = self.get_filesize()
135         for m in list(self.milestones):
136             (offset,size,d) = m
137             if offset+size <= current_size:
138                 log.msg(format=("immutable filenode read [%(si)s] " +
139                                 "%(offset)d+%(size)d vs %(filesize)d: " +
140                                 "done"),
141                         si=base32.b2a(self._storage_index),
142                         offset=offset, size=size, filesize=current_size,
143                         umid="nuedUg", level=log.NOISY)
144                 self.milestones.discard(m)
145                 eventually(d.callback, None)
146             else:
147                 log.msg(format=("immutable filenode read [%(si)s] " +
148                                 "%(offset)d+%(size)d vs %(filesize)d: " +
149                                 "still waiting"),
150                         si=base32.b2a(self._storage_index),
151                         offset=offset, size=size, filesize=current_size,
152                         umid="8PKOhg", level=log.NOISY)
153
154     def get_filesize(self):
155         if not self.cachefile:
156             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
157         try:
158             filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
159         except OSError:
160             filesize = 0
161         return filesize
162
163
164     def open(self, size):
165         if not self.cachefile:
166             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
167         self.f = open(self.cachefile.get_filename(), "wb")
168
169     def write(self, data):
170         self.f.write(data)
171         self._check_milestones()
172
173     def close(self):
174         self.f.close()
175         self._check_milestones()
176
177     def fail(self, why):
178         pass
179     def register_canceller(self, cb):
180         pass
181     def finish(self):
182         return None
183     # The following methods are just because the target might be a
184     # repairer.DownUpConnector, and just because the current CHKUpload object
185     # expects to find the storage index and encoding parameters in its
186     # Uploadable.
187     def set_storageindex(self, storageindex):
188         pass
189     def set_encodingparams(self, encodingparams):
190         pass
191
192
193 class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
194     def __init__(self, filecap, storage_broker, secret_holder,
195                  downloader, history, cachedirectorymanager):
196         assert isinstance(filecap, CHKFileURI)
197         self.u = filecap
198         self._storage_broker = storage_broker
199         self._secret_holder = secret_holder
200         self._downloader = downloader
201         self._history = history
202         storage_index = self.get_storage_index()
203         self.download_cache = DownloadCache(filecap, storage_index, downloader,
204                                             cachedirectorymanager)
205         prefix = self.u.get_verify_cap().to_string()
206         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
207         self.log("starting", level=log.OPERATIONAL)
208
209     def get_size(self):
210         return self.u.get_size()
211     def get_current_size(self):
212         return defer.succeed(self.get_size())
213
214     def get_cap(self):
215         return self.u
216     def get_readcap(self):
217         return self.u.get_readonly()
218     def get_verify_cap(self):
219         return self.u.get_verify_cap()
220     def get_repair_cap(self):
221         # CHK files can be repaired with just the verifycap
222         return self.u.get_verify_cap()
223
224     def get_uri(self):
225         return self.u.to_string()
226
227     def get_storage_index(self):
228         return self.u.storage_index
229
230     def check_and_repair(self, monitor, verify=False, add_lease=False):
231         verifycap = self.get_verify_cap()
232         sb = self._storage_broker
233         servers = sb.get_all_servers()
234         sh = self._secret_holder
235
236         c = Checker(verifycap=verifycap, servers=servers,
237                     verify=verify, add_lease=add_lease, secret_holder=sh,
238                     monitor=monitor)
239         d = c.start()
240         def _maybe_repair(cr):
241             crr = CheckAndRepairResults(self.u.storage_index)
242             crr.pre_repair_results = cr
243             if cr.is_healthy():
244                 crr.post_repair_results = cr
245                 return defer.succeed(crr)
246             else:
247                 crr.repair_attempted = True
248                 crr.repair_successful = False # until proven successful
249                 def _gather_repair_results(ur):
250                     assert IUploadResults.providedBy(ur), ur
251                     # clone the cr -- check results to form the basic of the prr -- post-repair results
252                     prr = CheckResults(cr.uri, cr.storage_index)
253                     prr.data = copy.deepcopy(cr.data)
254
255                     sm = prr.data['sharemap']
256                     assert isinstance(sm, dictutil.DictOfSets), sm
257                     sm.update(ur.sharemap)
258                     servers_responding = set(prr.data['servers-responding'])
259                     servers_responding.union(ur.sharemap.iterkeys())
260                     prr.data['servers-responding'] = list(servers_responding)
261                     prr.data['count-shares-good'] = len(sm)
262                     prr.data['count-good-share-hosts'] = len(sm)
263                     is_healthy = bool(len(sm) >= self.u.total_shares)
264                     is_recoverable = bool(len(sm) >= self.u.needed_shares)
265                     prr.set_healthy(is_healthy)
266                     prr.set_recoverable(is_recoverable)
267                     crr.repair_successful = is_healthy
268                     prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
269
270                     crr.post_repair_results = prr
271                     return crr
272                 def _repair_error(f):
273                     # as with mutable repair, I'm not sure if I want to pass
274                     # through a failure or not. TODO
275                     crr.repair_successful = False
276                     crr.repair_failure = f
277                     return f
278                 r = Repairer(storage_broker=sb, secret_holder=sh,
279                              verifycap=verifycap, monitor=monitor)
280                 d = r.start()
281                 d.addCallbacks(_gather_repair_results, _repair_error)
282                 return d
283
284         d.addCallback(_maybe_repair)
285         return d
286
287     def check(self, monitor, verify=False, add_lease=False):
288         verifycap = self.get_verify_cap()
289         sb = self._storage_broker
290         servers = sb.get_all_servers()
291         sh = self._secret_holder
292
293         v = Checker(verifycap=verifycap, servers=servers,
294                     verify=verify, add_lease=add_lease, secret_holder=sh,
295                     monitor=monitor)
296         return v.start()
297
298     def read(self, consumer, offset=0, size=None):
299         self.log("read", offset=offset, size=size,
300                  umid="UPP8FA", level=log.OPERATIONAL)
301         if size is None:
302             size = self.get_size() - offset
303         size = min(size, self.get_size() - offset)
304
305         if offset == 0 and size == self.get_size():
306             # don't use the cache, just do a normal streaming download
307             self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
308             target = download.ConsumerAdapter(consumer)
309             return self._downloader.download(self.get_cap(), target,
310                                              self._parentmsgid,
311                                              history=self._history)
312
313         d = self.download_cache.when_range_available(offset, size)
314         d.addCallback(lambda res:
315                       self.download_cache.read(consumer, offset, size))
316         return d
317
318 class LiteralProducer:
319     implements(IPushProducer)
320     def resumeProducing(self):
321         pass
322     def stopProducing(self):
323         pass
324
325
326 class LiteralFileNode(_ImmutableFileNodeBase):
327
328     def __init__(self, filecap):
329         assert isinstance(filecap, LiteralFileURI)
330         self.u = filecap
331
332     def get_size(self):
333         return len(self.u.data)
334     def get_current_size(self):
335         return defer.succeed(self.get_size())
336
337     def get_cap(self):
338         return self.u
339     def get_readcap(self):
340         return self.u
341     def get_verify_cap(self):
342         return None
343     def get_repair_cap(self):
344         return None
345
346     def get_uri(self):
347         return self.u.to_string()
348
349     def get_storage_index(self):
350         return None
351
352     def check(self, monitor, verify=False, add_lease=False):
353         return defer.succeed(None)
354
355     def check_and_repair(self, monitor, verify=False, add_lease=False):
356         return defer.succeed(None)
357
358     def read(self, consumer, offset=0, size=None):
359         if size is None:
360             data = self.u.data[offset:]
361         else:
362             data = self.u.data[offset:offset+size]
363
364         # We use twisted.protocols.basic.FileSender, which only does
365         # non-streaming, i.e. PullProducer, where the receiver/consumer must
366         # ask explicitly for each chunk of data. There are only two places in
367         # the Twisted codebase that can't handle streaming=False, both of
368         # which are in the upload path for an FTP/SFTP server
369         # (protocols.ftp.FileConsumer and
370         # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
371         # likely to be used as the target for a Tahoe download.
372
373         d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
374         d.addCallback(lambda lastSent: consumer)
375         return d