]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/node.py
04482e630b1609d412ef7fbfad08379e4d4babfe
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / node.py
1
2 import time
3 now = time.time
4 from twisted.python.failure import Failure
5 from twisted.internet import defer
6 from foolscap.api import eventually
7 from allmydata import uri
8 from allmydata.codec import CRSDecoder
9 from allmydata.util import base32, log, hashutil, mathutil, observer
10 from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
11 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
12      NotEnoughHashesError
13
14 # local imports
15 from finder import ShareFinder
16 from fetcher import SegmentFetcher
17 from segmentation import Segmentation
18 from common import BadCiphertextHashError
19
20 class Cancel:
21     def __init__(self, f):
22         self._f = f
23         self.active = True
24     def cancel(self):
25         if self.active:
26             self.active = False
27             self._f(self)
28
29 class DownloadNode:
30     """Internal class which manages downloads and holds state. External
31     callers use CiphertextFileNode instead."""
32
33     # Share._node points to me
34     def __init__(self, verifycap, storage_broker, secret_holder,
35                  terminator, history, download_status):
36         assert isinstance(verifycap, uri.CHKFileVerifierURI)
37         self._verifycap = verifycap
38         self._storage_broker = storage_broker
39         self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
40         self.running = True
41         if terminator:
42             terminator.register(self) # calls self.stop() at stopService()
43         # the rules are:
44         # 1: Only send network requests if you're active (self.running is True)
45         # 2: Use TimerService, not reactor.callLater
46         # 3: You can do eventual-sends any time.
47         # These rules should mean that once
48         # stopService()+flushEventualQueue() fires, everything will be done.
49         self._secret_holder = secret_holder
50         self._history = history
51         self._download_status = download_status
52
53         k, N = self._verifycap.needed_shares, self._verifycap.total_shares
54         self.share_hash_tree = IncompleteHashTree(N)
55
56         # we guess the segment size, so Segmentation can pull non-initial
57         # segments in a single roundtrip. This populates
58         # .guessed_segment_size, .guessed_num_segments, and
59         # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
60         # we'll need)
61         self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
62
63         # filled in when we parse a valid UEB
64         self.have_UEB = False
65         self.segment_size = None
66         self.tail_segment_size = None
67         self.tail_segment_padded = None
68         self.num_segments = None
69         self.block_size = None
70         self.tail_block_size = None
71
72         # things to track callers that want data
73
74         # _segment_requests can have duplicates
75         self._segment_requests = [] # (segnum, d, cancel_handle, logparent)
76         self._active_segment = None # a SegmentFetcher, with .segnum
77
78         self._segsize_observers = observer.OneShotObserverList()
79
80         # we create one top-level logparent for this _Node, and another one
81         # for each read() call. Segmentation and get_segment() messages are
82         # associated with the read() call, everything else is tied to the
83         # _Node's log entry.
84         lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
85                      " size=%(size)d,"
86                      " guessed_segsize=%(guessed_segsize)d,"
87                      " guessed_numsegs=%(guessed_numsegs)d",
88                      si=self._si_prefix, size=verifycap.size,
89                      guessed_segsize=self.guessed_segment_size,
90                      guessed_numsegs=self.guessed_num_segments,
91                      level=log.OPERATIONAL, umid="uJ0zAQ")
92         self._lp = lp
93
94         self._sharefinder = ShareFinder(storage_broker, verifycap, self,
95                                         self._download_status, lp)
96         self._shares = set()
97
98     def _build_guessed_tables(self, max_segment_size):
99         size = min(self._verifycap.size, max_segment_size)
100         s = mathutil.next_multiple(size, self._verifycap.needed_shares)
101         self.guessed_segment_size = s
102         r = self._calculate_sizes(self.guessed_segment_size)
103         self.guessed_num_segments = r["num_segments"]
104         # as with CommonShare, our ciphertext_hash_tree is a stub until we
105         # get the real num_segments
106         self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
107         self.ciphertext_hash_tree_leaves = self.guessed_num_segments
108
109     def __repr__(self):
110         return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
111
112     def stop(self):
113         # called by the Terminator at shutdown, mostly for tests
114         if self._active_segment:
115             self._active_segment.stop()
116             self._active_segment = None
117         self._sharefinder.stop()
118
119     # things called by outside callers, via CiphertextFileNode. get_segment()
120     # may also be called by Segmentation.
121
122     def read(self, consumer, offset=0, size=None, read_ev=None):
123         """I am the main entry point, from which FileNode.read() can get
124         data. I feed the consumer with the desired range of ciphertext. I
125         return a Deferred that fires (with the consumer) when the read is
126         finished.
127
128         Note that there is no notion of a 'file pointer': each call to read()
129         uses an independent offset= value."""
130         # for concurrent operations: each gets its own Segmentation manager
131         if size is None:
132             size = self._verifycap.size
133         # ignore overruns: clip size so offset+size does not go past EOF, and
134         # so size is not negative (which indicates that offset >= EOF)
135         size = max(0, min(size, self._verifycap.size-offset))
136         if read_ev is None:
137             read_ev = self._download_status.add_read_event(offset, size, now())
138
139         lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
140                      si=base32.b2a(self._verifycap.storage_index)[:8],
141                      offset=offset, size=size,
142                      level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
143         if self._history:
144             sp = self._history.stats_provider
145             sp.count("downloader.files_downloaded", 1) # really read() calls
146             sp.count("downloader.bytes_downloaded", size)
147         if size == 0:
148             read_ev.finished(now())
149             # no data, so no producer, so no register/unregisterProducer
150             return defer.succeed(consumer)
151         s = Segmentation(self, offset, size, consumer, read_ev, lp)
152         # this raises an interesting question: what segments to fetch? if
153         # offset=0, always fetch the first segment, and then allow
154         # Segmentation to be responsible for pulling the subsequent ones if
155         # the first wasn't large enough. If offset>0, we're going to need an
156         # extra roundtrip to get the UEB (and therefore the segment size)
157         # before we can figure out which segment to get. TODO: allow the
158         # offset-table-guessing code (which starts by guessing the segsize)
159         # to assist the offset>0 process.
160         d = s.start()
161         def _done(res):
162             read_ev.finished(now())
163             return res
164         d.addBoth(_done)
165         return d
166
167     def get_segment(self, segnum, logparent=None):
168         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
169         Deferred that fires with (offset,data) when the desired segment is
170         available, and c is an object on which c.cancel() can be called to
171         disavow interest in the segment (after which 'd' will never fire).
172
173         You probably need to know the segment size before calling this,
174         unless you want the first few bytes of the file. If you ask for a
175         segment number which turns out to be too large, the Deferred will
176         errback with BadSegmentNumberError.
177
178         The Deferred fires with the offset of the first byte of the data
179         segment, so that you can call get_segment() before knowing the
180         segment size, and still know which data you received.
181
182         The Deferred can also errback with other fatal problems, such as
183         NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
184         """
185         lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
186                      si=base32.b2a(self._verifycap.storage_index)[:8],
187                      segnum=segnum,
188                      level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
189         self._download_status.add_segment_request(segnum, now())
190         d = defer.Deferred()
191         c = Cancel(self._cancel_request)
192         self._segment_requests.append( (segnum, d, c, lp) )
193         self._start_new_segment()
194         return (d, c)
195
196     def get_segsize(self):
197         """Return a Deferred that fires when we know the real segment size."""
198         if self.segment_size:
199             return defer.succeed(self.segment_size)
200         # TODO: this downloads (and discards) the first segment of the file.
201         # We could make this more efficient by writing
202         # fetcher.SegmentSizeFetcher, with the job of finding a single valid
203         # share and extracting the UEB. We'd add Share.get_UEB() to request
204         # just the UEB.
205         (d,c) = self.get_segment(0)
206         # this ensures that an error during get_segment() will errback the
207         # caller, so Repair won't wait forever on completely missing files
208         d.addCallback(lambda ign: self._segsize_observers.when_fired())
209         return d
210
211     # things called by the Segmentation object used to transform
212     # arbitrary-sized read() calls into quantized segment fetches
213
214     def _start_new_segment(self):
215         if self._active_segment is None and self._segment_requests:
216             segnum = self._segment_requests[0][0]
217             k = self._verifycap.needed_shares
218             lp = self._segment_requests[0][3]
219             log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
220                     node=repr(self), segnum=segnum,
221                     level=log.NOISY, parent=lp, umid="wAlnHQ")
222             self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
223             active_shares = [s for s in self._shares if s.is_alive()]
224             fetcher.add_shares(active_shares) # this triggers the loop
225
226
227     # called by our child ShareFinder
228     def got_shares(self, shares):
229         self._shares.update(shares)
230         if self._active_segment:
231             self._active_segment.add_shares(shares)
232     def no_more_shares(self):
233         self._no_more_shares = True
234         if self._active_segment:
235             self._active_segment.no_more_shares()
236
237     # things called by our Share instances
238
239     def validate_and_store_UEB(self, UEB_s):
240         log.msg("validate_and_store_UEB",
241                 level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
242         h = hashutil.uri_extension_hash(UEB_s)
243         if h != self._verifycap.uri_extension_hash:
244             raise BadHashError
245         self._parse_and_store_UEB(UEB_s) # sets self._stuff
246         # TODO: a malformed (but authentic) UEB could throw an assertion in
247         # _parse_and_store_UEB, and we should abandon the download.
248         self.have_UEB = True
249
250         # inform the ShareFinder about our correct number of segments. This
251         # will update the block-hash-trees in all existing CommonShare
252         # instances, and will populate new ones with the correct value.
253         self._sharefinder.update_num_segments()
254
255     def _parse_and_store_UEB(self, UEB_s):
256         # Note: the UEB contains needed_shares and total_shares. These are
257         # redundant and inferior (the filecap contains the authoritative
258         # values). However, because it is possible to encode the same file in
259         # multiple ways, and the encoders might choose (poorly) to use the
260         # same key for both (therefore getting the same SI), we might
261         # encounter shares for both types. The UEB hashes will be different,
262         # however, and we'll disregard the "other" encoding's shares as
263         # corrupted.
264
265         # therefore, we ignore d['total_shares'] and d['needed_shares'].
266
267         d = uri.unpack_extension(UEB_s)
268
269         log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
270                 ueb=repr(uri.unpack_extension_readable(UEB_s)),
271                 vcap=self._verifycap.to_string(),
272                 level=log.NOISY, parent=self._lp, umid="cVqZnA")
273
274         k, N = self._verifycap.needed_shares, self._verifycap.total_shares
275
276         self.segment_size = d['segment_size']
277         self._segsize_observers.fire(self.segment_size)
278
279         r = self._calculate_sizes(self.segment_size)
280         self.tail_segment_size = r["tail_segment_size"]
281         self.tail_segment_padded = r["tail_segment_padded"]
282         self.num_segments = r["num_segments"]
283         self.block_size = r["block_size"]
284         self.tail_block_size = r["tail_block_size"]
285         log.msg("actual sizes: %s" % (r,),
286                 level=log.NOISY, parent=self._lp, umid="PY6P5Q")
287         if (self.segment_size == self.guessed_segment_size
288             and self.num_segments == self.guessed_num_segments):
289             log.msg("my guess was right!",
290                     level=log.NOISY, parent=self._lp, umid="x340Ow")
291         else:
292             log.msg("my guess was wrong! Extra round trips for me.",
293                     level=log.NOISY, parent=self._lp, umid="tb7RJw")
294
295         # zfec.Decode() instantiation is fast, but still, let's use the same
296         # codec instance for all but the last segment. 3-of-10 takes 15us on
297         # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
298         # 2.5ms, worst-case 254-of-255 is 9.3ms
299         self._codec = CRSDecoder()
300         self._codec.set_params(self.segment_size, k, N)
301
302
303         # Ciphertext hash tree root is mandatory, so that there is at most
304         # one ciphertext that matches this read-cap or verify-cap. The
305         # integrity check on the shares is not sufficient to prevent the
306         # original encoder from creating some shares of file A and other
307         # shares of file B. self.ciphertext_hash_tree was a guess before:
308         # this is where we create it for real.
309         self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
310         self.ciphertext_hash_tree_leaves = self.num_segments
311         self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
312
313         self.share_hash_tree.set_hashes({0: d['share_root_hash']})
314
315         # Our job is a fast download, not verification, so we ignore any
316         # redundant fields. The Verifier uses a different code path which
317         # does not ignore them.
318
319     def _calculate_sizes(self, segment_size):
320         # segments of ciphertext
321         size = self._verifycap.size
322         k = self._verifycap.needed_shares
323
324         # this assert matches the one in encode.py:127 inside
325         # Encoded._got_all_encoding_parameters, where the UEB is constructed
326         assert segment_size % k == 0
327
328         # the last segment is usually short. We don't store a whole segsize,
329         # but we do pad the segment up to a multiple of k, because the
330         # encoder requires that.
331         tail_segment_size = size % segment_size
332         if tail_segment_size == 0:
333             tail_segment_size = segment_size
334         padded = mathutil.next_multiple(tail_segment_size, k)
335         tail_segment_padded = padded
336
337         num_segments = mathutil.div_ceil(size, segment_size)
338
339         # each segment is turned into N blocks. All but the last are of size
340         # block_size, and the last is of size tail_block_size
341         block_size = segment_size / k
342         tail_block_size = tail_segment_padded / k
343
344         return { "tail_segment_size": tail_segment_size,
345                  "tail_segment_padded": tail_segment_padded,
346                  "num_segments": num_segments,
347                  "block_size": block_size,
348                  "tail_block_size": tail_block_size,
349                  }
350
351
352     def process_share_hashes(self, share_hashes):
353         for hashnum in share_hashes:
354             if hashnum >= len(self.share_hash_tree):
355                 # "BadHashError" is normally for e.g. a corrupt block. We
356                 # sort of abuse it here to mean a badly numbered hash (which
357                 # indicates corruption in the number bytes, rather than in
358                 # the data bytes).
359                 raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
360                                    % (hashnum, len(self.share_hash_tree)))
361         self.share_hash_tree.set_hashes(share_hashes)
362
363     def get_desired_ciphertext_hashes(self, segnum):
364         if segnum < self.ciphertext_hash_tree_leaves:
365             return self.ciphertext_hash_tree.needed_hashes(segnum,
366                                                            include_leaf=True)
367         return []
368     def get_needed_ciphertext_hashes(self, segnum):
369         cht = self.ciphertext_hash_tree
370         return cht.needed_hashes(segnum, include_leaf=True)
371
372     def process_ciphertext_hashes(self, hashes):
373         assert self.num_segments is not None
374         # this may raise BadHashError or NotEnoughHashesError
375         self.ciphertext_hash_tree.set_hashes(hashes)
376
377
378     # called by our child SegmentFetcher
379
380     def want_more_shares(self):
381         self._sharefinder.hungry()
382
383     def fetch_failed(self, sf, f):
384         assert sf is self._active_segment
385         # deliver error upwards
386         for (d,c) in self._extract_requests(sf.segnum):
387             eventually(self._deliver, d, c, f)
388         self._active_segment = None
389         self._start_new_segment()
390
391     def process_blocks(self, segnum, blocks):
392         d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
393         d.addCallback(self._check_ciphertext_hash, segnum)
394         def _deliver(result):
395             ds = self._download_status
396             if isinstance(result, Failure):
397                 ds.add_segment_error(segnum, now())
398             else:
399                 (offset, segment, decodetime) = result
400                 ds.add_segment_delivery(segnum, now(),
401                                         offset, len(segment), decodetime)
402             log.msg(format="delivering segment(%(segnum)d)",
403                     segnum=segnum,
404                     level=log.OPERATIONAL, parent=self._lp,
405                     umid="j60Ojg")
406             for (d,c) in self._extract_requests(segnum):
407                 eventually(self._deliver, d, c, result)
408             self._active_segment = None
409             self._start_new_segment()
410         d.addBoth(_deliver)
411         d.addErrback(lambda f:
412                      log.err("unhandled error during process_blocks",
413                              failure=f, level=log.WEIRD,
414                              parent=self._lp, umid="MkEsCg"))
415
416     def _decode_blocks(self, segnum, blocks):
417         tail = (segnum == self.num_segments-1)
418         codec = self._codec
419         block_size = self.block_size
420         decoded_size = self.segment_size
421         if tail:
422             # account for the padding in the last segment
423             codec = CRSDecoder()
424             k, N = self._verifycap.needed_shares, self._verifycap.total_shares
425             codec.set_params(self.tail_segment_padded, k, N)
426             block_size = self.tail_block_size
427             decoded_size = self.tail_segment_padded
428
429         shares = []
430         shareids = []
431         for (shareid, share) in blocks.iteritems():
432             assert len(share) == block_size
433             shareids.append(shareid)
434             shares.append(share)
435         del blocks
436
437         start = now()
438         d = codec.decode(shares, shareids)   # segment
439         del shares
440         def _process(buffers):
441             decodetime = now() - start
442             segment = "".join(buffers)
443             assert len(segment) == decoded_size
444             del buffers
445             if tail:
446                 segment = segment[:self.tail_segment_size]
447             return (segment, decodetime)
448         d.addCallback(_process)
449         return d
450
451     def _check_ciphertext_hash(self, (segment, decodetime), segnum):
452         assert self._active_segment.segnum == segnum
453         assert self.segment_size is not None
454         offset = segnum * self.segment_size
455
456         h = hashutil.crypttext_segment_hash(segment)
457         try:
458             self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
459             return (offset, segment, decodetime)
460         except (BadHashError, NotEnoughHashesError):
461             format = ("hash failure in ciphertext_hash_tree:"
462                       " segnum=%(segnum)d, SI=%(si)s")
463             log.msg(format=format, segnum=segnum, si=self._si_prefix,
464                     failure=Failure(),
465                     level=log.WEIRD, parent=self._lp, umid="MTwNnw")
466             # this is especially weird, because we made it past the share
467             # hash tree. It implies that we're using the wrong encoding, or
468             # that the uploader deliberately constructed a bad UEB.
469             msg = format % {"segnum": segnum, "si": self._si_prefix}
470             raise BadCiphertextHashError(msg)
471
472     def _deliver(self, d, c, result):
473         # this method exists to handle cancel() that occurs between
474         # _got_segment and _deliver
475         if c.active:
476             c.active = False # it is now too late to cancel
477             d.callback(result) # might actually be an errback
478
479     def _extract_requests(self, segnum):
480         """Remove matching requests and return their (d,c) tuples so that the
481         caller can retire them."""
482         retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests
483                   if segnum0 == segnum]
484         self._segment_requests = [t for t in self._segment_requests
485                                   if t[0] != segnum]
486         return retire
487
488     def _cancel_request(self, c):
489         self._segment_requests = [t for t in self._segment_requests
490                                   if t[2] != c]
491         segnums = [segnum for (segnum,d,c,lp) in self._segment_requests]
492         # self._active_segment might be None in rare circumstances, so make
493         # sure we tolerate it
494         if self._active_segment and self._active_segment.segnum not in segnums:
495             self._active_segment.stop()
496             self._active_segment = None
497             self._start_new_segment()
498
499     # called by ShareFinder to choose hashtree sizes in CommonShares, and by
500     # SegmentFetcher to tell if it is still fetching a valid segnum.
501     def get_num_segments(self):
502         # returns (best_num_segments, authoritative)
503         if self.num_segments is None:
504             return (self.guessed_num_segments, False)
505         return (self.num_segments, True)