]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
fac141b744fda5da6dfd822df7867ca385bd9bfa
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1 import copy, os.path, stat
2 from cStringIO import StringIO
3 from zope.interface import implements
4 from twisted.internet import defer
5 from twisted.internet.interfaces import IPushProducer, IConsumer
6 from twisted.protocols import basic
7 from foolscap.api import eventually
8 from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
9      IDownloadTarget, IUploadResults
10 from allmydata.util import dictutil, log, base32
11 from allmydata.util.assertutil import precondition
12 from allmydata import uri as urimodule
13 from allmydata.immutable.checker import Checker
14 from allmydata.check_results import CheckResults, CheckAndRepairResults
15 from allmydata.immutable.repairer import Repairer
16 from allmydata.immutable import download
17
18 class _ImmutableFileNodeBase(object):
19     implements(IFileNode, ICheckable)
20
21     def __init__(self, uri, client):
22         precondition(urimodule.IImmutableFileURI.providedBy(uri), uri)
23         self.u = IFileURI(uri)
24         self._client = client
25
26     def get_readonly_uri(self):
27         return self.get_uri()
28
29     def is_mutable(self):
30         return False
31
32     def is_readonly(self):
33         return True
34
35     def __hash__(self):
36         return self.u.__hash__()
37     def __eq__(self, other):
38         if IFileNode.providedBy(other):
39             return self.u.__eq__(other.u)
40         else:
41             return False
42     def __ne__(self, other):
43         if IFileNode.providedBy(other):
44             return self.u.__eq__(other.u)
45         else:
46             return True
47
48 class PortionOfFile:
49     # like a list slice (things[2:14]), but for a file on disk
50     def __init__(self, fn, offset=0, size=None):
51         self.f = open(fn, "rb")
52         self.f.seek(offset)
53         self.bytes_left = size
54
55     def read(self, size=None):
56         # bytes_to_read = min(size, self.bytes_left), but None>anything
57         if size is None:
58             bytes_to_read = self.bytes_left
59         elif self.bytes_left is None:
60             bytes_to_read = size
61         else:
62             bytes_to_read = min(size, self.bytes_left)
63         data = self.f.read(bytes_to_read)
64         if self.bytes_left is not None:
65             self.bytes_left -= len(data)
66         return data
67
68 class DownloadCache:
69     implements(IDownloadTarget)
70
71     def __init__(self, node, cachedirectorymanager):
72         self._downloader = node._client.getServiceNamed("downloader")
73         self._uri = node.get_uri()
74         self._storage_index = node.get_storage_index()
75         self.milestones = set() # of (offset,size,Deferred)
76         self.cachedirectorymanager = cachedirectorymanager
77         self.cachefile = None
78         self.download_in_progress = False
79         # five states:
80         #  new FileNode, no downloads ever performed
81         #  new FileNode, leftover file (partial)
82         #  new FileNode, leftover file (whole)
83         #  download in progress, not yet complete
84         #  download complete
85
86     def when_range_available(self, offset, size):
87         assert isinstance(offset, (int,long))
88         assert isinstance(size, (int,long))
89
90         d = defer.Deferred()
91         self.milestones.add( (offset,size,d) )
92         self._check_milestones()
93         if self.milestones and not self.download_in_progress:
94             self.download_in_progress = True
95             log.msg(format=("immutable filenode read [%(si)s]: " +
96                             "starting download"),
97                     si=base32.b2a(self._storage_index),
98                     umid="h26Heg", level=log.OPERATIONAL)
99             d2 = self._downloader.download(self._uri, self)
100             d2.addBoth(self._download_done)
101             d2.addErrback(self._download_failed)
102             d2.addErrback(log.err, umid="cQaM9g")
103         return d
104
105     def read(self, consumer, offset, size):
106         assert offset+size <= self.get_filesize()
107         if not self.cachefile:
108             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
109         f = PortionOfFile(self.cachefile.get_filename(), offset, size)
110         d = basic.FileSender().beginFileTransfer(f, consumer)
111         d.addCallback(lambda lastSent: consumer)
112         return d
113
114     def _download_done(self, res):
115         # clear download_in_progress, so failed downloads can be re-tried
116         self.download_in_progress = False
117         return res
118
119     def _download_failed(self, f):
120         # tell anyone who's waiting that we failed
121         for m in self.milestones:
122             (offset,size,d) = m
123             eventually(d.errback, f)
124         self.milestones.clear()
125
126     def _check_milestones(self):
127         current_size = self.get_filesize()
128         for m in list(self.milestones):
129             (offset,size,d) = m
130             if offset+size <= current_size:
131                 log.msg(format=("immutable filenode read [%(si)s] " +
132                                 "%(offset)d+%(size)d vs %(filesize)d: " +
133                                 "done"),
134                         si=base32.b2a(self._storage_index),
135                         offset=offset, size=size, filesize=current_size,
136                         umid="nuedUg", level=log.NOISY)
137                 self.milestones.discard(m)
138                 eventually(d.callback, None)
139             else:
140                 log.msg(format=("immutable filenode read [%(si)s] " +
141                                 "%(offset)d+%(size)d vs %(filesize)d: " +
142                                 "still waiting"),
143                         si=base32.b2a(self._storage_index),
144                         offset=offset, size=size, filesize=current_size,
145                         umid="8PKOhg", level=log.NOISY)
146
147     def get_filesize(self):
148         if not self.cachefile:
149             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
150         try:
151             filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
152         except OSError:
153             filesize = 0
154         return filesize
155
156
157     def open(self, size):
158         if not self.cachefile:
159             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
160         self.f = open(self.cachefile.get_filename(), "wb")
161
162     def write(self, data):
163         self.f.write(data)
164         self._check_milestones()
165
166     def close(self):
167         self.f.close()
168         self._check_milestones()
169
170     def fail(self, why):
171         pass
172     def register_canceller(self, cb):
173         pass
174     def finish(self):
175         return None
176     # The following methods are just because the target might be a repairer.DownUpConnector,
177     # and just because the current CHKUpload object expects to find the storage index and
178     # encoding parameters in its Uploadable.
179     def set_storageindex(self, storageindex):
180         pass
181     def set_encodingparams(self, encodingparams):
182         pass
183
184
185 class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
186     def __init__(self, uri, client, cachedirectorymanager):
187         _ImmutableFileNodeBase.__init__(self, uri, client)
188         self.download_cache = DownloadCache(self, cachedirectorymanager)
189         prefix = uri.get_verify_cap().to_string()
190         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
191         self.log("starting", level=log.OPERATIONAL)
192
193     def get_uri(self):
194         return self.u.to_string()
195
196     def get_size(self):
197         return self.u.get_size()
198
199     def get_verify_cap(self):
200         return self.u.get_verify_cap()
201
202     def get_repair_cap(self):
203         # CHK files can be repaired with just the verifycap
204         return self.u.get_verify_cap()
205
206     def get_storage_index(self):
207         return self.u.storage_index
208
209     def check_and_repair(self, monitor, verify=False, add_lease=False):
210         verifycap = self.get_verify_cap()
211         sb = self._client.get_storage_broker()
212         servers = sb.get_all_servers()
213
214         c = Checker(client=self._client, verifycap=verifycap, servers=servers,
215                     verify=verify, add_lease=add_lease, monitor=monitor)
216         d = c.start()
217         def _maybe_repair(cr):
218             crr = CheckAndRepairResults(self.u.storage_index)
219             crr.pre_repair_results = cr
220             if cr.is_healthy():
221                 crr.post_repair_results = cr
222                 return defer.succeed(crr)
223             else:
224                 crr.repair_attempted = True
225                 crr.repair_successful = False # until proven successful
226                 def _gather_repair_results(ur):
227                     assert IUploadResults.providedBy(ur), ur
228                     # clone the cr -- check results to form the basic of the prr -- post-repair results
229                     prr = CheckResults(cr.uri, cr.storage_index)
230                     prr.data = copy.deepcopy(cr.data)
231
232                     sm = prr.data['sharemap']
233                     assert isinstance(sm, dictutil.DictOfSets), sm
234                     sm.update(ur.sharemap)
235                     servers_responding = set(prr.data['servers-responding'])
236                     servers_responding.union(ur.sharemap.iterkeys())
237                     prr.data['servers-responding'] = list(servers_responding)
238                     prr.data['count-shares-good'] = len(sm)
239                     prr.data['count-good-share-hosts'] = len(sm)
240                     is_healthy = bool(len(sm) >= self.u.total_shares)
241                     is_recoverable = bool(len(sm) >= self.u.needed_shares)
242                     prr.set_healthy(is_healthy)
243                     prr.set_recoverable(is_recoverable)
244                     crr.repair_successful = is_healthy
245                     prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
246
247                     crr.post_repair_results = prr
248                     return crr
249                 def _repair_error(f):
250                     # as with mutable repair, I'm not sure if I want to pass
251                     # through a failure or not. TODO
252                     crr.repair_successful = False
253                     crr.repair_failure = f
254                     return f
255                 r = Repairer(client=self._client, verifycap=verifycap, monitor=monitor)
256                 d = r.start()
257                 d.addCallbacks(_gather_repair_results, _repair_error)
258                 return d
259
260         d.addCallback(_maybe_repair)
261         return d
262
263     def check(self, monitor, verify=False, add_lease=False):
264         verifycap = self.get_verify_cap()
265         sb = self._client.get_storage_broker()
266         servers = sb.get_all_servers()
267
268         v = Checker(client=self._client, verifycap=verifycap, servers=servers,
269                     verify=verify, add_lease=add_lease, monitor=monitor)
270         return v.start()
271
272     def read(self, consumer, offset=0, size=None):
273         if size is None:
274             size = self.get_size() - offset
275         size = min(size, self.get_size() - offset)
276
277         if offset == 0 and size == self.get_size():
278             # don't use the cache, just do a normal streaming download
279             self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
280             return self.download(download.ConsumerAdapter(consumer))
281
282         d = self.download_cache.when_range_available(offset, size)
283         d.addCallback(lambda res:
284                       self.download_cache.read(consumer, offset, size))
285         return d
286
287     def download(self, target):
288         downloader = self._client.getServiceNamed("downloader")
289         history = self._client.get_history()
290         return downloader.download(self.get_uri(), target, self._parentmsgid,
291                                    history=history)
292
293     def download_to_data(self):
294         downloader = self._client.getServiceNamed("downloader")
295         history = self._client.get_history()
296         return downloader.download_to_data(self.get_uri(), history=history)
297
298 class LiteralProducer:
299     implements(IPushProducer)
300     def resumeProducing(self):
301         pass
302     def stopProducing(self):
303         pass
304
305
306 class LiteralFileNode(_ImmutableFileNodeBase):
307
308     def __init__(self, uri, client):
309         precondition(urimodule.IImmutableFileURI.providedBy(uri), uri)
310         _ImmutableFileNodeBase.__init__(self, uri, client)
311
312     def get_uri(self):
313         return self.u.to_string()
314
315     def get_size(self):
316         return len(self.u.data)
317
318     def get_verify_cap(self):
319         return None
320
321     def get_repair_cap(self):
322         return None
323
324     def get_storage_index(self):
325         return None
326
327     def check(self, monitor, verify=False, add_lease=False):
328         return defer.succeed(None)
329
330     def check_and_repair(self, monitor, verify=False, add_lease=False):
331         return defer.succeed(None)
332
333     def read(self, consumer, offset=0, size=None):
334         if size is None:
335             data = self.u.data[offset:]
336         else:
337             data = self.u.data[offset:offset+size]
338
339         # We use twisted.protocols.basic.FileSender, which only does
340         # non-streaming, i.e. PullProducer, where the receiver/consumer must
341         # ask explicitly for each chunk of data. There are only two places in
342         # the Twisted codebase that can't handle streaming=False, both of
343         # which are in the upload path for an FTP/SFTP server
344         # (protocols.ftp.FileConsumer and
345         # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
346         # likely to be used as the target for a Tahoe download.
347
348         d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
349         d.addCallback(lambda lastSent: consumer)
350         return d
351
352     def download(self, target):
353         # note that this does not update the stats_provider
354         data = self.u.data
355         if IConsumer.providedBy(target):
356             target.registerProducer(LiteralProducer(), True)
357         target.open(len(data))
358         target.write(data)
359         if IConsumer.providedBy(target):
360             target.unregisterProducer()
361         target.close()
362         return defer.maybeDeferred(target.finish)
363
364     def download_to_data(self):
365         data = self.u.data
366         return defer.succeed(data)