]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/filenode.py
download: refactor handling of URI Extension Block and crypttext hash tree, simplify...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / filenode.py
1
2 import os.path, stat
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IPushProducer, IConsumer
7 from twisted.protocols import basic
8 from foolscap.eventual import eventually
9 from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
10      IDownloadTarget
11 from allmydata.util import log, base32
12 from allmydata.uri import from_string as uri_from_string
13 from allmydata.immutable.checker import SimpleCHKFileChecker, \
14      SimpleCHKFileVerifier
15 from allmydata.immutable import download
16
17 class _ImmutableFileNodeBase(object):
18     implements(IFileNode, ICheckable)
19
20     def __init__(self, uri, client):
21         self.u = IFileURI(uri)
22         self._client = client
23
24     def get_readonly_uri(self):
25         return self.get_uri()
26
27     def is_mutable(self):
28         return False
29
30     def is_readonly(self):
31         return True
32
33     def __hash__(self):
34         return self.u.__hash__()
35     def __eq__(self, other):
36         if IFileNode.providedBy(other):
37             return self.u.__eq__(other.u)
38         else:
39             return False
40     def __ne__(self, other):
41         if IFileNode.providedBy(other):
42             return self.u.__eq__(other.u)
43         else:
44             return True
45
46 class PortionOfFile:
47     # like a list slice (things[2:14]), but for a file on disk
48     def __init__(self, fn, offset=0, size=None):
49         self.f = open(fn, "rb")
50         self.f.seek(offset)
51         self.bytes_left = size
52
53     def read(self, size=None):
54         # bytes_to_read = min(size, self.bytes_left), but None>anything
55         if size is None:
56             bytes_to_read = self.bytes_left
57         elif self.bytes_left is None:
58             bytes_to_read = size
59         else:
60             bytes_to_read = min(size, self.bytes_left)
61         data = self.f.read(bytes_to_read)
62         if self.bytes_left is not None:
63             self.bytes_left -= len(data)
64         return data
65
66 class DownloadCache:
67     implements(IDownloadTarget)
68
69     def __init__(self, node, cachefile):
70         self._downloader = node._client.getServiceNamed("downloader")
71         self._uri = node.get_uri()
72         self._storage_index = node.get_storage_index()
73         self.milestones = set() # of (offset,size,Deferred)
74         self.cachefile = cachefile
75         self.download_in_progress = False
76         # five states:
77         #  new FileNode, no downloads ever performed
78         #  new FileNode, leftover file (partial)
79         #  new FileNode, leftover file (whole)
80         #  download in progress, not yet complete
81         #  download complete
82
83     def when_range_available(self, offset, size):
84         assert isinstance(offset, (int,long))
85         assert isinstance(size, (int,long))
86
87         d = defer.Deferred()
88         self.milestones.add( (offset,size,d) )
89         self._check_milestones()
90         if self.milestones and not self.download_in_progress:
91             self.download_in_progress = True
92             log.msg(format=("immutable filenode read [%(si)s]: " +
93                             "starting download"),
94                     si=base32.b2a(self._storage_index),
95                     umid="h26Heg", level=log.OPERATIONAL)
96             d2 = self._downloader.download(self._uri, self)
97             d2.addBoth(self._download_done)
98             d2.addErrback(self._download_failed)
99             d2.addErrback(log.err, umid="cQaM9g")
100         return d
101
102     def read(self, consumer, offset, size):
103         assert offset+size <= self.get_filesize()
104         f = PortionOfFile(self.cachefile.get_filename(), offset, size)
105         d = basic.FileSender().beginFileTransfer(f, consumer)
106         d.addCallback(lambda lastSent: consumer)
107         return d
108
109     def _download_done(self, res):
110         # clear download_in_progress, so failed downloads can be re-tried
111         self.download_in_progress = False
112         return res
113
114     def _download_failed(self, f):
115         # tell anyone who's waiting that we failed
116         for m in self.milestones:
117             (offset,size,d) = m
118             eventually(d.errback, f)
119         self.milestones.clear()
120
121     def _check_milestones(self):
122         current_size = self.get_filesize()
123         for m in list(self.milestones):
124             (offset,size,d) = m
125             if offset+size <= current_size:
126                 log.msg(format=("immutable filenode read [%(si)s] " +
127                                 "%(offset)d+%(size)d vs %(filesize)d: " +
128                                 "done"),
129                         si=base32.b2a(self._storage_index),
130                         offset=offset, size=size, filesize=current_size,
131                         umid="nuedUg", level=log.NOISY)
132                 self.milestones.discard(m)
133                 eventually(d.callback, None)
134             else:
135                 log.msg(format=("immutable filenode read [%(si)s] " +
136                                 "%(offset)d+%(size)d vs %(filesize)d: " +
137                                 "still waiting"),
138                         si=base32.b2a(self._storage_index),
139                         offset=offset, size=size, filesize=current_size,
140                         umid="8PKOhg", level=log.NOISY)
141
142     def get_filesize(self):
143         try:
144             filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
145         except OSError:
146             filesize = 0
147         return filesize
148
149
150     def open(self, size):
151         self.f = open(self.cachefile.get_filename(), "wb")
152
153     def write(self, data):
154         self.f.write(data)
155         self._check_milestones()
156
157     def close(self):
158         self.f.close()
159         self._check_milestones()
160
161     def fail(self, why):
162         pass
163     def register_canceller(self, cb):
164         pass
165     def finish(self):
166         return None
167
168
169
170 class FileNode(_ImmutableFileNodeBase):
171     checker_class = SimpleCHKFileChecker
172     verifier_class = SimpleCHKFileVerifier
173
174     def __init__(self, uri, client, cachefile):
175         _ImmutableFileNodeBase.__init__(self, uri, client)
176         self.download_cache = DownloadCache(self, cachefile)
177
178     def get_uri(self):
179         return self.u.to_string()
180
181     def get_size(self):
182         return self.u.get_size()
183
184     def get_verifier(self):
185         return self.u.get_verifier()
186
187     def get_storage_index(self):
188         return self.u.storage_index
189
190     def check(self, monitor, verify=False):
191         # TODO: pass the Monitor to SimpleCHKFileChecker or
192         # SimpleCHKFileVerifier, have it call monitor.raise_if_cancelled()
193         # before sending each request.
194         storage_index = self.u.storage_index
195         assert IFileURI.providedBy(self.u), self.u
196         k = self.u.needed_shares
197         N = self.u.total_shares
198         size = self.u.size
199         ueb_hash = self.u.uri_extension_hash
200         if verify:
201             v = self.verifier_class(self._client,
202                                     uri_from_string(self.get_uri()), storage_index,
203                                     k, N, size, ueb_hash)
204         else:
205             v = self.checker_class(self._client,
206                                    uri_from_string(self.get_uri()), storage_index,
207                                    k, N)
208         return v.start()
209
210     def check_and_repair(self, monitor, verify=False):
211         # this is a stub, to allow the deep-check tests to pass.
212         #raise NotImplementedError("not implemented yet")
213         from allmydata.checker_results import CheckAndRepairResults
214         cr = CheckAndRepairResults(self.u.storage_index)
215         d = self.check(verify)
216         def _done(r):
217             cr.pre_repair_results = cr.post_repair_results = r
218             cr.repair_attempted = False
219             return cr
220         d.addCallback(_done)
221         return d
222
223     def read(self, consumer, offset=0, size=None):
224         if size is None:
225             size = self.get_size() - offset
226         size = min(size, self.get_size() - offset)
227
228         if offset == 0 and size == self.get_size():
229             # don't use the cache, just do a normal streaming download
230             log.msg(format=("immutable filenode read [%(si)s]: " +
231                             "doing normal full download"),
232                     si=base32.b2a(self.u.storage_index),
233                     umid="VRSBwg", level=log.OPERATIONAL)
234             return self.download(download.ConsumerAdapter(consumer))
235
236         d = self.download_cache.when_range_available(offset, size)
237         d.addCallback(lambda res:
238                       self.download_cache.read(consumer, offset, size))
239         return d
240
241     def download(self, target):
242         downloader = self._client.getServiceNamed("downloader")
243         return downloader.download(self.get_uri(), target)
244
245     def download_to_data(self):
246         downloader = self._client.getServiceNamed("downloader")
247         return downloader.download_to_data(self.get_uri())
248
249 class LiteralProducer:
250     implements(IPushProducer)
251     def resumeProducing(self):
252         pass
253     def stopProducing(self):
254         pass
255
256
257 class LiteralFileNode(_ImmutableFileNodeBase):
258
259     def __init__(self, uri, client):
260         _ImmutableFileNodeBase.__init__(self, uri, client)
261
262     def get_uri(self):
263         return self.u.to_string()
264
265     def get_size(self):
266         return len(self.u.data)
267
268     def get_verifier(self):
269         return None
270
271     def get_storage_index(self):
272         return None
273
274     def check(self, monitor, verify=False):
275         return defer.succeed(None)
276
277     def check_and_repair(self, monitor, verify=False):
278         return defer.succeed(None)
279
280     def read(self, consumer, offset=0, size=None):
281         if size is None:
282             data = self.u.data[offset:]
283         else:
284             data = self.u.data[offset:offset+size]
285
286         # We use twisted.protocols.basic.FileSender, which only does
287         # non-streaming, i.e. PullProducer, where the receiver/consumer must
288         # ask explicitly for each chunk of data. There are only two places in
289         # the Twisted codebase that can't handle streaming=False, both of
290         # which are in the upload path for an FTP/SFTP server
291         # (protocols.ftp.FileConsumer and
292         # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
293         # likely to be used as the target for a Tahoe download.
294
295         d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
296         d.addCallback(lambda lastSent: consumer)
297         return d
298
299     def download(self, target):
300         # note that this does not update the stats_provider
301         data = self.u.data
302         if IConsumer.providedBy(target):
303             target.registerProducer(LiteralProducer(), True)
304         target.open(len(data))
305         target.write(data)
306         if IConsumer.providedBy(target):
307             target.unregisterProducer()
308         target.close()
309         return defer.maybeDeferred(target.finish)
310
311     def download_to_data(self):
312         data = self.u.data
313         return defer.succeed(data)