]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
make get_size/get_current_size consistent for all IFilesystemNode classes
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1 import copy, os.path, stat
2 from cStringIO import StringIO
3 from zope.interface import implements
4 from twisted.internet import defer
5 from twisted.internet.interfaces import IPushProducer, IConsumer
6 from twisted.protocols import basic
7 from foolscap.api import eventually
8 from allmydata.interfaces import IFileNode, ICheckable, \
9      IDownloadTarget, IUploadResults
10 from allmydata.util import dictutil, log, base32
11 from allmydata.uri import CHKFileURI, LiteralFileURI
12 from allmydata.immutable.checker import Checker
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.immutable.repairer import Repairer
15 from allmydata.immutable import download
16
17 class _ImmutableFileNodeBase(object):
18     implements(IFileNode, ICheckable)
19
20     def get_readonly_uri(self):
21         return self.get_uri()
22
23     def is_mutable(self):
24         return False
25
26     def is_readonly(self):
27         return True
28
29     def __hash__(self):
30         return self.u.__hash__()
31     def __eq__(self, other):
32         if IFileNode.providedBy(other):
33             return self.u.__eq__(other.u)
34         else:
35             return False
36     def __ne__(self, other):
37         if IFileNode.providedBy(other):
38             return self.u.__eq__(other.u)
39         else:
40             return True
41
42 class PortionOfFile:
43     # like a list slice (things[2:14]), but for a file on disk
44     def __init__(self, fn, offset=0, size=None):
45         self.f = open(fn, "rb")
46         self.f.seek(offset)
47         self.bytes_left = size
48
49     def read(self, size=None):
50         # bytes_to_read = min(size, self.bytes_left), but None>anything
51         if size is None:
52             bytes_to_read = self.bytes_left
53         elif self.bytes_left is None:
54             bytes_to_read = size
55         else:
56             bytes_to_read = min(size, self.bytes_left)
57         data = self.f.read(bytes_to_read)
58         if self.bytes_left is not None:
59             self.bytes_left -= len(data)
60         return data
61
62 class DownloadCache:
63     implements(IDownloadTarget)
64
65     def __init__(self, filecap, storage_index, downloader,
66                  cachedirectorymanager):
67         self._downloader = downloader
68         self._uri = filecap
69         self._storage_index = storage_index
70         self.milestones = set() # of (offset,size,Deferred)
71         self.cachedirectorymanager = cachedirectorymanager
72         self.cachefile = None
73         self.download_in_progress = False
74         # five states:
75         #  new FileNode, no downloads ever performed
76         #  new FileNode, leftover file (partial)
77         #  new FileNode, leftover file (whole)
78         #  download in progress, not yet complete
79         #  download complete
80
81     def when_range_available(self, offset, size):
82         assert isinstance(offset, (int,long))
83         assert isinstance(size, (int,long))
84
85         d = defer.Deferred()
86         self.milestones.add( (offset,size,d) )
87         self._check_milestones()
88         if self.milestones and not self.download_in_progress:
89             self.download_in_progress = True
90             log.msg(format=("immutable filenode read [%(si)s]: " +
91                             "starting download"),
92                     si=base32.b2a(self._storage_index),
93                     umid="h26Heg", level=log.OPERATIONAL)
94             d2 = self._downloader.download(self._uri, self)
95             d2.addBoth(self._download_done)
96             d2.addErrback(self._download_failed)
97             d2.addErrback(log.err, umid="cQaM9g")
98         return d
99
100     def read(self, consumer, offset, size):
101         assert offset+size <= self.get_filesize()
102         if not self.cachefile:
103             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
104         f = PortionOfFile(self.cachefile.get_filename(), offset, size)
105         d = basic.FileSender().beginFileTransfer(f, consumer)
106         d.addCallback(lambda lastSent: consumer)
107         return d
108
109     def _download_done(self, res):
110         # clear download_in_progress, so failed downloads can be re-tried
111         self.download_in_progress = False
112         return res
113
114     def _download_failed(self, f):
115         # tell anyone who's waiting that we failed
116         for m in self.milestones:
117             (offset,size,d) = m
118             eventually(d.errback, f)
119         self.milestones.clear()
120
121     def _check_milestones(self):
122         current_size = self.get_filesize()
123         for m in list(self.milestones):
124             (offset,size,d) = m
125             if offset+size <= current_size:
126                 log.msg(format=("immutable filenode read [%(si)s] " +
127                                 "%(offset)d+%(size)d vs %(filesize)d: " +
128                                 "done"),
129                         si=base32.b2a(self._storage_index),
130                         offset=offset, size=size, filesize=current_size,
131                         umid="nuedUg", level=log.NOISY)
132                 self.milestones.discard(m)
133                 eventually(d.callback, None)
134             else:
135                 log.msg(format=("immutable filenode read [%(si)s] " +
136                                 "%(offset)d+%(size)d vs %(filesize)d: " +
137                                 "still waiting"),
138                         si=base32.b2a(self._storage_index),
139                         offset=offset, size=size, filesize=current_size,
140                         umid="8PKOhg", level=log.NOISY)
141
142     def get_filesize(self):
143         if not self.cachefile:
144             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
145         try:
146             filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
147         except OSError:
148             filesize = 0
149         return filesize
150
151
152     def open(self, size):
153         if not self.cachefile:
154             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
155         self.f = open(self.cachefile.get_filename(), "wb")
156
157     def write(self, data):
158         self.f.write(data)
159         self._check_milestones()
160
161     def close(self):
162         self.f.close()
163         self._check_milestones()
164
165     def fail(self, why):
166         pass
167     def register_canceller(self, cb):
168         pass
169     def finish(self):
170         return None
171     # The following methods are just because the target might be a
172     # repairer.DownUpConnector, and just because the current CHKUpload object
173     # expects to find the storage index and encoding parameters in its
174     # Uploadable.
175     def set_storageindex(self, storageindex):
176         pass
177     def set_encodingparams(self, encodingparams):
178         pass
179
180
181 class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
182     def __init__(self, filecap, storage_broker, secret_holder,
183                  downloader, history, cachedirectorymanager):
184         assert isinstance(filecap, CHKFileURI)
185         self.u = filecap
186         self._storage_broker = storage_broker
187         self._secret_holder = secret_holder
188         self._downloader = downloader
189         self._history = history
190         storage_index = self.get_storage_index()
191         self.download_cache = DownloadCache(filecap, storage_index, downloader,
192                                             cachedirectorymanager)
193         prefix = self.u.get_verify_cap().to_string()
194         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
195         self.log("starting", level=log.OPERATIONAL)
196
197     def get_size(self):
198         return self.u.get_size()
199     def get_current_size(self):
200         return defer.succeed(self.get_size())
201
202     def get_cap(self):
203         return self.u
204     def get_readcap(self):
205         return self.u.get_readonly()
206     def get_verify_cap(self):
207         return self.u.get_verify_cap()
208     def get_repair_cap(self):
209         # CHK files can be repaired with just the verifycap
210         return self.u.get_verify_cap()
211
212     def get_uri(self):
213         return self.u.to_string()
214
215     def get_storage_index(self):
216         return self.u.storage_index
217
218     def check_and_repair(self, monitor, verify=False, add_lease=False):
219         verifycap = self.get_verify_cap()
220         sb = self._storage_broker
221         servers = sb.get_all_servers()
222         sh = self._secret_holder
223
224         c = Checker(verifycap=verifycap, servers=servers,
225                     verify=verify, add_lease=add_lease, secret_holder=sh,
226                     monitor=monitor)
227         d = c.start()
228         def _maybe_repair(cr):
229             crr = CheckAndRepairResults(self.u.storage_index)
230             crr.pre_repair_results = cr
231             if cr.is_healthy():
232                 crr.post_repair_results = cr
233                 return defer.succeed(crr)
234             else:
235                 crr.repair_attempted = True
236                 crr.repair_successful = False # until proven successful
237                 def _gather_repair_results(ur):
238                     assert IUploadResults.providedBy(ur), ur
239                     # clone the cr -- check results to form the basic of the prr -- post-repair results
240                     prr = CheckResults(cr.uri, cr.storage_index)
241                     prr.data = copy.deepcopy(cr.data)
242
243                     sm = prr.data['sharemap']
244                     assert isinstance(sm, dictutil.DictOfSets), sm
245                     sm.update(ur.sharemap)
246                     servers_responding = set(prr.data['servers-responding'])
247                     servers_responding.union(ur.sharemap.iterkeys())
248                     prr.data['servers-responding'] = list(servers_responding)
249                     prr.data['count-shares-good'] = len(sm)
250                     prr.data['count-good-share-hosts'] = len(sm)
251                     is_healthy = bool(len(sm) >= self.u.total_shares)
252                     is_recoverable = bool(len(sm) >= self.u.needed_shares)
253                     prr.set_healthy(is_healthy)
254                     prr.set_recoverable(is_recoverable)
255                     crr.repair_successful = is_healthy
256                     prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
257
258                     crr.post_repair_results = prr
259                     return crr
260                 def _repair_error(f):
261                     # as with mutable repair, I'm not sure if I want to pass
262                     # through a failure or not. TODO
263                     crr.repair_successful = False
264                     crr.repair_failure = f
265                     return f
266                 r = Repairer(storage_broker=sb, secret_holder=sh,
267                              verifycap=verifycap, monitor=monitor)
268                 d = r.start()
269                 d.addCallbacks(_gather_repair_results, _repair_error)
270                 return d
271
272         d.addCallback(_maybe_repair)
273         return d
274
275     def check(self, monitor, verify=False, add_lease=False):
276         verifycap = self.get_verify_cap()
277         sb = self._storage_broker
278         servers = sb.get_all_servers()
279         sh = self._secret_holder
280
281         v = Checker(verifycap=verifycap, servers=servers,
282                     verify=verify, add_lease=add_lease, secret_holder=sh,
283                     monitor=monitor)
284         return v.start()
285
286     def read(self, consumer, offset=0, size=None):
287         if size is None:
288             size = self.get_size() - offset
289         size = min(size, self.get_size() - offset)
290
291         if offset == 0 and size == self.get_size():
292             # don't use the cache, just do a normal streaming download
293             self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
294             return self.download(download.ConsumerAdapter(consumer))
295
296         d = self.download_cache.when_range_available(offset, size)
297         d.addCallback(lambda res:
298                       self.download_cache.read(consumer, offset, size))
299         return d
300
301     def download(self, target):
302         return self._downloader.download(self.get_cap(), target,
303                                          self._parentmsgid,
304                                          history=self._history)
305
306     def download_to_data(self):
307         return self._downloader.download_to_data(self.get_cap(),
308                                                  history=self._history)
309     def download_to_filename(self, filename):
310         return self._downloader.download_to_filename(self.get_cap(), filename)
311
312 class LiteralProducer:
313     implements(IPushProducer)
314     def resumeProducing(self):
315         pass
316     def stopProducing(self):
317         pass
318
319
320 class LiteralFileNode(_ImmutableFileNodeBase):
321
322     def __init__(self, filecap):
323         assert isinstance(filecap, LiteralFileURI)
324         self.u = filecap
325
326     def get_size(self):
327         return len(self.u.data)
328     def get_current_size(self):
329         return defer.succeed(self.get_size())
330
331     def get_cap(self):
332         return self.u
333     def get_readcap(self):
334         return self.u
335     def get_verify_cap(self):
336         return None
337     def get_repair_cap(self):
338         return None
339
340     def get_uri(self):
341         return self.u.to_string()
342
343     def get_storage_index(self):
344         return None
345
346     def check(self, monitor, verify=False, add_lease=False):
347         return defer.succeed(None)
348
349     def check_and_repair(self, monitor, verify=False, add_lease=False):
350         return defer.succeed(None)
351
352     def read(self, consumer, offset=0, size=None):
353         if size is None:
354             data = self.u.data[offset:]
355         else:
356             data = self.u.data[offset:offset+size]
357
358         # We use twisted.protocols.basic.FileSender, which only does
359         # non-streaming, i.e. PullProducer, where the receiver/consumer must
360         # ask explicitly for each chunk of data. There are only two places in
361         # the Twisted codebase that can't handle streaming=False, both of
362         # which are in the upload path for an FTP/SFTP server
363         # (protocols.ftp.FileConsumer and
364         # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
365         # likely to be used as the target for a Tahoe download.
366
367         d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
368         d.addCallback(lambda lastSent: consumer)
369         return d
370
371     def download(self, target):
372         # note that this does not update the stats_provider
373         data = self.u.data
374         if IConsumer.providedBy(target):
375             target.registerProducer(LiteralProducer(), True)
376         target.open(len(data))
377         target.write(data)
378         if IConsumer.providedBy(target):
379             target.unregisterProducer()
380         target.close()
381         return defer.maybeDeferred(target.finish)
382
383     def download_to_data(self):
384         data = self.u.data
385         return defer.succeed(data)