]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
immutable: Make more parts of download use logging mixins and know what their "parent...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import os.path, stat
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IPushProducer, IConsumer
7 from twisted.protocols import basic
8 from foolscap.eventual import eventually
9 from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
10      IDownloadTarget
11 from allmydata.util import log, base32
12 from allmydata import uri as urimodule
13 from allmydata.immutable.checker import Checker
14 from allmydata.check_results import CheckAndRepairResults
15 from allmydata.immutable.repairer import Repairer
16 from allmydata.immutable import download
17
18 class _ImmutableFileNodeBase(object):
19     implements(IFileNode, ICheckable)
20
21     def __init__(self, uri, client):
22         self.u = IFileURI(uri)
23         self._client = client
24
25     def get_readonly_uri(self):
26         return self.get_uri()
27
28     def is_mutable(self):
29         return False
30
31     def is_readonly(self):
32         return True
33
34     def __hash__(self):
35         return self.u.__hash__()
36     def __eq__(self, other):
37         if IFileNode.providedBy(other):
38             return self.u.__eq__(other.u)
39         else:
40             return False
41     def __ne__(self, other):
42         if IFileNode.providedBy(other):
43             return self.u.__eq__(other.u)
44         else:
45             return True
46
47 class PortionOfFile:
48     # like a list slice (things[2:14]), but for a file on disk
49     def __init__(self, fn, offset=0, size=None):
50         self.f = open(fn, "rb")
51         self.f.seek(offset)
52         self.bytes_left = size
53
54     def read(self, size=None):
55         # bytes_to_read = min(size, self.bytes_left), but None>anything
56         if size is None:
57             bytes_to_read = self.bytes_left
58         elif self.bytes_left is None:
59             bytes_to_read = size
60         else:
61             bytes_to_read = min(size, self.bytes_left)
62         data = self.f.read(bytes_to_read)
63         if self.bytes_left is not None:
64             self.bytes_left -= len(data)
65         return data
66
67 class DownloadCache:
68     implements(IDownloadTarget)
69
70     def __init__(self, node, cachefile):
71         self._downloader = node._client.getServiceNamed("downloader")
72         self._uri = node.get_uri()
73         self._storage_index = node.get_storage_index()
74         self.milestones = set() # of (offset,size,Deferred)
75         self.cachefile = cachefile
76         self.download_in_progress = False
77         # five states:
78         #  new FileNode, no downloads ever performed
79         #  new FileNode, leftover file (partial)
80         #  new FileNode, leftover file (whole)
81         #  download in progress, not yet complete
82         #  download complete
83
84     def when_range_available(self, offset, size):
85         assert isinstance(offset, (int,long))
86         assert isinstance(size, (int,long))
87
88         d = defer.Deferred()
89         self.milestones.add( (offset,size,d) )
90         self._check_milestones()
91         if self.milestones and not self.download_in_progress:
92             self.download_in_progress = True
93             log.msg(format=("immutable filenode read [%(si)s]: " +
94                             "starting download"),
95                     si=base32.b2a(self._storage_index),
96                     umid="h26Heg", level=log.OPERATIONAL)
97             d2 = self._downloader.download(self._uri, self)
98             d2.addBoth(self._download_done)
99             d2.addErrback(self._download_failed)
100             d2.addErrback(log.err, umid="cQaM9g")
101         return d
102
103     def read(self, consumer, offset, size):
104         assert offset+size <= self.get_filesize()
105         f = PortionOfFile(self.cachefile.get_filename(), offset, size)
106         d = basic.FileSender().beginFileTransfer(f, consumer)
107         d.addCallback(lambda lastSent: consumer)
108         return d
109
110     def _download_done(self, res):
111         # clear download_in_progress, so failed downloads can be re-tried
112         self.download_in_progress = False
113         return res
114
115     def _download_failed(self, f):
116         # tell anyone who's waiting that we failed
117         for m in self.milestones:
118             (offset,size,d) = m
119             eventually(d.errback, f)
120         self.milestones.clear()
121
122     def _check_milestones(self):
123         current_size = self.get_filesize()
124         for m in list(self.milestones):
125             (offset,size,d) = m
126             if offset+size <= current_size:
127                 log.msg(format=("immutable filenode read [%(si)s] " +
128                                 "%(offset)d+%(size)d vs %(filesize)d: " +
129                                 "done"),
130                         si=base32.b2a(self._storage_index),
131                         offset=offset, size=size, filesize=current_size,
132                         umid="nuedUg", level=log.NOISY)
133                 self.milestones.discard(m)
134                 eventually(d.callback, None)
135             else:
136                 log.msg(format=("immutable filenode read [%(si)s] " +
137                                 "%(offset)d+%(size)d vs %(filesize)d: " +
138                                 "still waiting"),
139                         si=base32.b2a(self._storage_index),
140                         offset=offset, size=size, filesize=current_size,
141                         umid="8PKOhg", level=log.NOISY)
142
143     def get_filesize(self):
144         try:
145             filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
146         except OSError:
147             filesize = 0
148         return filesize
149
150
151     def open(self, size):
152         self.f = open(self.cachefile.get_filename(), "wb")
153
154     def write(self, data):
155         self.f.write(data)
156         self._check_milestones()
157
158     def close(self):
159         self.f.close()
160         self._check_milestones()
161
162     def fail(self, why):
163         pass
164     def register_canceller(self, cb):
165         pass
166     def finish(self):
167         return None
168
169
170
171 class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
172     def __init__(self, uri, client, cachefile):
173         _ImmutableFileNodeBase.__init__(self, uri, client)
174         self.download_cache = DownloadCache(self, cachefile)
175         prefix = urimodule.from_string(uri).get_verify_cap().to_string()
176         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
177         self.log("starting", level=log.OPERATIONAL)
178
179     def get_uri(self):
180         return self.u.to_string()
181
182     def get_size(self):
183         return self.u.get_size()
184
185     def get_verify_cap(self):
186         return self.u.get_verify_cap()
187
188     def get_storage_index(self):
189         return self.u.storage_index
190
191     def check_and_repair(self, monitor, verify=False):
192         verifycap = self.get_verify_cap()
193         servers = self._client.get_servers("storage")
194
195         c = Checker(client=self._client, verifycap=verifycap, servers=servers, verify=verify, monitor=monitor)
196         d = c.start()
197         def _maybe_repair(cr):
198             crr = CheckAndRepairResults(self.u.storage_index)
199             crr.pre_repair_results = cr
200             if cr.is_healthy():
201                 crr.post_repair_results = cr
202                 return defer.succeed(crr)
203             else:
204                 def _gather_repair_results(rr):
205                     crr.post_repair_results = rr
206                     return crr
207                 r = Repairer(client=self._client, verifycap=verifycap, servers=servers, monitor=monitor)
208                 d = r.start()
209                 d.addCallback(_gather_repair_results)
210                 return d
211
212         d.addCallback(_maybe_repair)
213         return d
214
215     def check(self, monitor, verify=False):
216         v = Checker(client=self._client, verifycap=self.get_verify_cap(), servers=self._client.get_servers("storage"), verify=verify, monitor=monitor)
217         return v.start()
218
219     def read(self, consumer, offset=0, size=None):
220         if size is None:
221             size = self.get_size() - offset
222         size = min(size, self.get_size() - offset)
223
224         if offset == 0 and size == self.get_size():
225             # don't use the cache, just do a normal streaming download
226             self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
227             return self.download(download.ConsumerAdapter(consumer))
228
229         d = self.download_cache.when_range_available(offset, size)
230         d.addCallback(lambda res:
231                       self.download_cache.read(consumer, offset, size))
232         return d
233
234     def download(self, target):
235         downloader = self._client.getServiceNamed("downloader")
236         return downloader.download(self.get_uri(), target, self._parentmsgid)
237
238     def download_to_data(self):
239         downloader = self._client.getServiceNamed("downloader")
240         return downloader.download_to_data(self.get_uri())
241
242 class LiteralProducer:
243     implements(IPushProducer)
244     def resumeProducing(self):
245         pass
246     def stopProducing(self):
247         pass
248
249
250 class LiteralFileNode(_ImmutableFileNodeBase):
251
252     def __init__(self, uri, client):
253         _ImmutableFileNodeBase.__init__(self, uri, client)
254
255     def get_uri(self):
256         return self.u.to_string()
257
258     def get_size(self):
259         return len(self.u.data)
260
261     def get_verify_cap(self):
262         return None
263
264     def get_storage_index(self):
265         return None
266
267     def check(self, monitor, verify=False):
268         return defer.succeed(None)
269
270     def check_and_repair(self, monitor, verify=False):
271         return defer.succeed(None)
272
273     def read(self, consumer, offset=0, size=None):
274         if size is None:
275             data = self.u.data[offset:]
276         else:
277             data = self.u.data[offset:offset+size]
278
279         # We use twisted.protocols.basic.FileSender, which only does
280         # non-streaming, i.e. PullProducer, where the receiver/consumer must
281         # ask explicitly for each chunk of data. There are only two places in
282         # the Twisted codebase that can't handle streaming=False, both of
283         # which are in the upload path for an FTP/SFTP server
284         # (protocols.ftp.FileConsumer and
285         # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
286         # likely to be used as the target for a Tahoe download.
287
288         d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
289         d.addCallback(lambda lastSent: consumer)
290         return d
291
292     def download(self, target):
293         # note that this does not update the stats_provider
294         data = self.u.data
295         if IConsumer.providedBy(target):
296             target.registerProducer(LiteralProducer(), True)
297         target.open(len(data))
298         target.write(data)
299         if IConsumer.providedBy(target):
300             target.unregisterProducer()
301         target.close()
302         return defer.maybeDeferred(target.finish)
303
304     def download_to_data(self):
305         data = self.u.data
306         return defer.succeed(data)