]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/node.py
428bca895d1f0f99ade19ed3cb275e0ce40cc420
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / node.py
1
2 import time
3 now = time.time
4 from zope.interface import Interface
5 from twisted.python.failure import Failure
6 from twisted.internet import defer
7 from foolscap.api import eventually
8 from allmydata import uri
9 from allmydata.codec import CRSDecoder
10 from allmydata.util import base32, log, hashutil, mathutil, observer
11 from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
12 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
13      NotEnoughHashesError
14
15 # local imports
16 from finder import ShareFinder
17 from fetcher import SegmentFetcher
18 from segmentation import Segmentation
19 from common import BadCiphertextHashError
20
21 class IDownloadStatusHandlingConsumer(Interface):
22     def set_download_status_read_event(read_ev):
23         """Record the DownloadStatus 'read event', to be updated with the
24         time it takes to decrypt each chunk of data."""
25
26 class Cancel:
27     def __init__(self, f):
28         self._f = f
29         self.active = True
30     def cancel(self):
31         if self.active:
32             self.active = False
33             self._f(self)
34
35 class DownloadNode:
36     """Internal class which manages downloads and holds state. External
37     callers use CiphertextFileNode instead."""
38
39     # Share._node points to me
40     def __init__(self, verifycap, storage_broker, secret_holder,
41                  terminator, history, download_status):
42         assert isinstance(verifycap, uri.CHKFileVerifierURI)
43         self._verifycap = verifycap
44         self._storage_broker = storage_broker
45         self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
46         self.running = True
47         if terminator:
48             terminator.register(self) # calls self.stop() at stopService()
49         # the rules are:
50         # 1: Only send network requests if you're active (self.running is True)
51         # 2: Use TimerService, not reactor.callLater
52         # 3: You can do eventual-sends any time.
53         # These rules should mean that once
54         # stopService()+flushEventualQueue() fires, everything will be done.
55         self._secret_holder = secret_holder
56         self._history = history
57         self._download_status = download_status
58
59         k, N = self._verifycap.needed_shares, self._verifycap.total_shares
60         self.share_hash_tree = IncompleteHashTree(N)
61
62         # we guess the segment size, so Segmentation can pull non-initial
63         # segments in a single roundtrip. This populates
64         # .guessed_segment_size, .guessed_num_segments, and
65         # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
66         # we'll need)
67         self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
68
69         # filled in when we parse a valid UEB
70         self.have_UEB = False
71         self.segment_size = None
72         self.tail_segment_size = None
73         self.tail_segment_padded = None
74         self.num_segments = None
75         self.block_size = None
76         self.tail_block_size = None
77
78         # things to track callers that want data
79
80         # _segment_requests can have duplicates
81         self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp)
82         self._active_segment = None # a SegmentFetcher, with .segnum
83
84         self._segsize_observers = observer.OneShotObserverList()
85
86         # we create one top-level logparent for this _Node, and another one
87         # for each read() call. Segmentation and get_segment() messages are
88         # associated with the read() call, everything else is tied to the
89         # _Node's log entry.
90         lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
91                      " size=%(size)d,"
92                      " guessed_segsize=%(guessed_segsize)d,"
93                      " guessed_numsegs=%(guessed_numsegs)d",
94                      si=self._si_prefix, size=verifycap.size,
95                      guessed_segsize=self.guessed_segment_size,
96                      guessed_numsegs=self.guessed_num_segments,
97                      level=log.OPERATIONAL, umid="uJ0zAQ")
98         self._lp = lp
99
100         self._sharefinder = ShareFinder(storage_broker, verifycap, self,
101                                         self._download_status, lp)
102         self._shares = set()
103
104     def _build_guessed_tables(self, max_segment_size):
105         size = min(self._verifycap.size, max_segment_size)
106         s = mathutil.next_multiple(size, self._verifycap.needed_shares)
107         self.guessed_segment_size = s
108         r = self._calculate_sizes(self.guessed_segment_size)
109         self.guessed_num_segments = r["num_segments"]
110         # as with CommonShare, our ciphertext_hash_tree is a stub until we
111         # get the real num_segments
112         self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
113         self.ciphertext_hash_tree_leaves = self.guessed_num_segments
114
115     def __repr__(self):
116         return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
117
118     def stop(self):
119         # called by the Terminator at shutdown, mostly for tests
120         if self._active_segment:
121             self._active_segment.stop()
122             self._active_segment = None
123         self._sharefinder.stop()
124
125     # things called by outside callers, via CiphertextFileNode. get_segment()
126     # may also be called by Segmentation.
127
128     def read(self, consumer, offset, size):
129         """I am the main entry point, from which FileNode.read() can get
130         data. I feed the consumer with the desired range of ciphertext. I
131         return a Deferred that fires (with the consumer) when the read is
132         finished.
133
134         Note that there is no notion of a 'file pointer': each call to read()
135         uses an independent offset= value.
136         """
137         # for concurrent operations: each gets its own Segmentation manager
138         if size is None:
139             size = self._verifycap.size
140         # ignore overruns: clip size so offset+size does not go past EOF, and
141         # so size is not negative (which indicates that offset >= EOF)
142         size = max(0, min(size, self._verifycap.size-offset))
143
144         read_ev = self._download_status.add_read_event(offset, size, now())
145         if IDownloadStatusHandlingConsumer.providedBy(consumer):
146             consumer.set_download_status_read_event(read_ev)
147
148         lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
149                      si=base32.b2a(self._verifycap.storage_index)[:8],
150                      offset=offset, size=size,
151                      level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
152         if self._history:
153             sp = self._history.stats_provider
154             sp.count("downloader.files_downloaded", 1) # really read() calls
155             sp.count("downloader.bytes_downloaded", size)
156         if size == 0:
157             read_ev.finished(now())
158             # no data, so no producer, so no register/unregisterProducer
159             return defer.succeed(consumer)
160
161         # for concurrent operations, each read() gets its own Segmentation
162         # manager
163         s = Segmentation(self, offset, size, consumer, read_ev, lp)
164
165         # this raises an interesting question: what segments to fetch? if
166         # offset=0, always fetch the first segment, and then allow
167         # Segmentation to be responsible for pulling the subsequent ones if
168         # the first wasn't large enough. If offset>0, we're going to need an
169         # extra roundtrip to get the UEB (and therefore the segment size)
170         # before we can figure out which segment to get. TODO: allow the
171         # offset-table-guessing code (which starts by guessing the segsize)
172         # to assist the offset>0 process.
173         d = s.start()
174         def _done(res):
175             read_ev.finished(now())
176             return res
177         d.addBoth(_done)
178         return d
179
180     def get_segment(self, segnum, logparent=None):
181         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
182         Deferred that fires with (offset,data) when the desired segment is
183         available, and c is an object on which c.cancel() can be called to
184         disavow interest in the segment (after which 'd' will never fire).
185
186         You probably need to know the segment size before calling this,
187         unless you want the first few bytes of the file. If you ask for a
188         segment number which turns out to be too large, the Deferred will
189         errback with BadSegmentNumberError.
190
191         The Deferred fires with the offset of the first byte of the data
192         segment, so that you can call get_segment() before knowing the
193         segment size, and still know which data you received.
194
195         The Deferred can also errback with other fatal problems, such as
196         NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
197         """
198         lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
199                      si=base32.b2a(self._verifycap.storage_index)[:8],
200                      segnum=segnum,
201                      level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
202         seg_ev = self._download_status.add_segment_request(segnum, now())
203         d = defer.Deferred()
204         c = Cancel(self._cancel_request)
205         self._segment_requests.append( (segnum, d, c, seg_ev, lp) )
206         self._start_new_segment()
207         return (d, c)
208
209     def get_segsize(self):
210         """Return a Deferred that fires when we know the real segment size."""
211         if self.segment_size:
212             return defer.succeed(self.segment_size)
213         # TODO: this downloads (and discards) the first segment of the file.
214         # We could make this more efficient by writing
215         # fetcher.SegmentSizeFetcher, with the job of finding a single valid
216         # share and extracting the UEB. We'd add Share.get_UEB() to request
217         # just the UEB.
218         (d,c) = self.get_segment(0)
219         # this ensures that an error during get_segment() will errback the
220         # caller, so Repair won't wait forever on completely missing files
221         d.addCallback(lambda ign: self._segsize_observers.when_fired())
222         return d
223
224     # things called by the Segmentation object used to transform
225     # arbitrary-sized read() calls into quantized segment fetches
226
227     def _start_new_segment(self):
228         if self._active_segment is None and self._segment_requests:
229             (segnum, d, c, seg_ev, lp) = self._segment_requests[0]
230             k = self._verifycap.needed_shares
231             log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
232                     node=repr(self), segnum=segnum,
233                     level=log.NOISY, parent=lp, umid="wAlnHQ")
234             self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
235             seg_ev.activate(now())
236             active_shares = [s for s in self._shares if s.is_alive()]
237             fetcher.add_shares(active_shares) # this triggers the loop
238
239
240     # called by our child ShareFinder
241     def got_shares(self, shares):
242         self._shares.update(shares)
243         if self._active_segment:
244             self._active_segment.add_shares(shares)
245     def no_more_shares(self):
246         self._no_more_shares = True
247         if self._active_segment:
248             self._active_segment.no_more_shares()
249
250     # things called by our Share instances
251
252     def validate_and_store_UEB(self, UEB_s):
253         log.msg("validate_and_store_UEB",
254                 level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
255         h = hashutil.uri_extension_hash(UEB_s)
256         if h != self._verifycap.uri_extension_hash:
257             raise BadHashError
258         self._parse_and_store_UEB(UEB_s) # sets self._stuff
259         # TODO: a malformed (but authentic) UEB could throw an assertion in
260         # _parse_and_store_UEB, and we should abandon the download.
261         self.have_UEB = True
262
263         # inform the ShareFinder about our correct number of segments. This
264         # will update the block-hash-trees in all existing CommonShare
265         # instances, and will populate new ones with the correct value.
266         self._sharefinder.update_num_segments()
267
268     def _parse_and_store_UEB(self, UEB_s):
269         # Note: the UEB contains needed_shares and total_shares. These are
270         # redundant and inferior (the filecap contains the authoritative
271         # values). However, because it is possible to encode the same file in
272         # multiple ways, and the encoders might choose (poorly) to use the
273         # same key for both (therefore getting the same SI), we might
274         # encounter shares for both types. The UEB hashes will be different,
275         # however, and we'll disregard the "other" encoding's shares as
276         # corrupted.
277
278         # therefore, we ignore d['total_shares'] and d['needed_shares'].
279
280         d = uri.unpack_extension(UEB_s)
281
282         log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
283                 ueb=repr(uri.unpack_extension_readable(UEB_s)),
284                 vcap=self._verifycap.to_string(),
285                 level=log.NOISY, parent=self._lp, umid="cVqZnA")
286
287         k, N = self._verifycap.needed_shares, self._verifycap.total_shares
288
289         self.segment_size = d['segment_size']
290         self._segsize_observers.fire(self.segment_size)
291
292         r = self._calculate_sizes(self.segment_size)
293         self.tail_segment_size = r["tail_segment_size"]
294         self.tail_segment_padded = r["tail_segment_padded"]
295         self.num_segments = r["num_segments"]
296         self.block_size = r["block_size"]
297         self.tail_block_size = r["tail_block_size"]
298         log.msg("actual sizes: %s" % (r,),
299                 level=log.NOISY, parent=self._lp, umid="PY6P5Q")
300         if (self.segment_size == self.guessed_segment_size
301             and self.num_segments == self.guessed_num_segments):
302             log.msg("my guess was right!",
303                     level=log.NOISY, parent=self._lp, umid="x340Ow")
304         else:
305             log.msg("my guess was wrong! Extra round trips for me.",
306                     level=log.NOISY, parent=self._lp, umid="tb7RJw")
307
308         # zfec.Decode() instantiation is fast, but still, let's use the same
309         # codec instance for all but the last segment. 3-of-10 takes 15us on
310         # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
311         # 2.5ms, worst-case 254-of-255 is 9.3ms
312         self._codec = CRSDecoder()
313         self._codec.set_params(self.segment_size, k, N)
314
315
316         # Ciphertext hash tree root is mandatory, so that there is at most
317         # one ciphertext that matches this read-cap or verify-cap. The
318         # integrity check on the shares is not sufficient to prevent the
319         # original encoder from creating some shares of file A and other
320         # shares of file B. self.ciphertext_hash_tree was a guess before:
321         # this is where we create it for real.
322         self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
323         self.ciphertext_hash_tree_leaves = self.num_segments
324         self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
325
326         self.share_hash_tree.set_hashes({0: d['share_root_hash']})
327
328         # Our job is a fast download, not verification, so we ignore any
329         # redundant fields. The Verifier uses a different code path which
330         # does not ignore them.
331
332     def _calculate_sizes(self, segment_size):
333         # segments of ciphertext
334         size = self._verifycap.size
335         k = self._verifycap.needed_shares
336
337         # this assert matches the one in encode.py:127 inside
338         # Encoded._got_all_encoding_parameters, where the UEB is constructed
339         assert segment_size % k == 0
340
341         # the last segment is usually short. We don't store a whole segsize,
342         # but we do pad the segment up to a multiple of k, because the
343         # encoder requires that.
344         tail_segment_size = size % segment_size
345         if tail_segment_size == 0:
346             tail_segment_size = segment_size
347         padded = mathutil.next_multiple(tail_segment_size, k)
348         tail_segment_padded = padded
349
350         num_segments = mathutil.div_ceil(size, segment_size)
351
352         # each segment is turned into N blocks. All but the last are of size
353         # block_size, and the last is of size tail_block_size
354         block_size = segment_size / k
355         tail_block_size = tail_segment_padded / k
356
357         return { "tail_segment_size": tail_segment_size,
358                  "tail_segment_padded": tail_segment_padded,
359                  "num_segments": num_segments,
360                  "block_size": block_size,
361                  "tail_block_size": tail_block_size,
362                  }
363
364
365     def process_share_hashes(self, share_hashes):
366         for hashnum in share_hashes:
367             if hashnum >= len(self.share_hash_tree):
368                 # "BadHashError" is normally for e.g. a corrupt block. We
369                 # sort of abuse it here to mean a badly numbered hash (which
370                 # indicates corruption in the number bytes, rather than in
371                 # the data bytes).
372                 raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
373                                    % (hashnum, len(self.share_hash_tree)))
374         self.share_hash_tree.set_hashes(share_hashes)
375
376     def get_desired_ciphertext_hashes(self, segnum):
377         if segnum < self.ciphertext_hash_tree_leaves:
378             return self.ciphertext_hash_tree.needed_hashes(segnum,
379                                                            include_leaf=True)
380         return []
381     def get_needed_ciphertext_hashes(self, segnum):
382         cht = self.ciphertext_hash_tree
383         return cht.needed_hashes(segnum, include_leaf=True)
384
385     def process_ciphertext_hashes(self, hashes):
386         assert self.num_segments is not None
387         # this may raise BadHashError or NotEnoughHashesError
388         self.ciphertext_hash_tree.set_hashes(hashes)
389
390
391     # called by our child SegmentFetcher
392
393     def want_more_shares(self):
394         self._sharefinder.hungry()
395
396     def fetch_failed(self, sf, f):
397         assert sf is self._active_segment
398         # deliver error upwards
399         for (d,c,seg_ev) in self._extract_requests(sf.segnum):
400             seg_ev.error(now())
401             eventually(self._deliver, d, c, f)
402         self._active_segment = None
403         self._start_new_segment()
404
405     def process_blocks(self, segnum, blocks):
406         d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
407         d.addCallback(self._check_ciphertext_hash, segnum)
408         def _deliver(result):
409             log.msg(format="delivering segment(%(segnum)d)",
410                     segnum=segnum,
411                     level=log.OPERATIONAL, parent=self._lp,
412                     umid="j60Ojg")
413             when = now()
414             if isinstance(result, Failure):
415                 # this catches failures in decode or ciphertext hash
416                 for (d,c,seg_ev) in self._extract_requests(segnum):
417                     seg_ev.error(when)
418                     eventually(self._deliver, d, c, result)
419             else:
420                 (offset, segment, decodetime) = result
421                 for (d,c,seg_ev) in self._extract_requests(segnum):
422                     # when we have two requests for the same segment, the
423                     # second one will not be "activated" before the data is
424                     # delivered, so to allow the status-reporting code to see
425                     # consistent behavior, we activate them all now. The
426                     # SegmentEvent will ignore duplicate activate() calls.
427                     # Note that this will result in an inaccurate "receive
428                     # speed" for the second request.
429                     seg_ev.activate(when)
430                     seg_ev.deliver(when, offset, len(segment), decodetime)
431                     eventually(self._deliver, d, c, result)
432             self._active_segment = None
433             self._start_new_segment()
434         d.addBoth(_deliver)
435         d.addErrback(log.err, "unhandled error during process_blocks",
436                      level=log.WEIRD, parent=self._lp, umid="MkEsCg")
437
438     def _decode_blocks(self, segnum, blocks):
439         tail = (segnum == self.num_segments-1)
440         codec = self._codec
441         block_size = self.block_size
442         decoded_size = self.segment_size
443         if tail:
444             # account for the padding in the last segment
445             codec = CRSDecoder()
446             k, N = self._verifycap.needed_shares, self._verifycap.total_shares
447             codec.set_params(self.tail_segment_padded, k, N)
448             block_size = self.tail_block_size
449             decoded_size = self.tail_segment_padded
450
451         shares = []
452         shareids = []
453         for (shareid, share) in blocks.iteritems():
454             assert len(share) == block_size
455             shareids.append(shareid)
456             shares.append(share)
457         del blocks
458
459         start = now()
460         d = codec.decode(shares, shareids)   # segment
461         del shares
462         def _process(buffers):
463             decodetime = now() - start
464             segment = "".join(buffers)
465             assert len(segment) == decoded_size
466             del buffers
467             if tail:
468                 segment = segment[:self.tail_segment_size]
469             return (segment, decodetime)
470         d.addCallback(_process)
471         return d
472
473     def _check_ciphertext_hash(self, (segment, decodetime), segnum):
474         assert self._active_segment.segnum == segnum
475         assert self.segment_size is not None
476         offset = segnum * self.segment_size
477
478         h = hashutil.crypttext_segment_hash(segment)
479         try:
480             self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
481             return (offset, segment, decodetime)
482         except (BadHashError, NotEnoughHashesError):
483             format = ("hash failure in ciphertext_hash_tree:"
484                       " segnum=%(segnum)d, SI=%(si)s")
485             log.msg(format=format, segnum=segnum, si=self._si_prefix,
486                     failure=Failure(),
487                     level=log.WEIRD, parent=self._lp, umid="MTwNnw")
488             # this is especially weird, because we made it past the share
489             # hash tree. It implies that we're using the wrong encoding, or
490             # that the uploader deliberately constructed a bad UEB.
491             msg = format % {"segnum": segnum, "si": self._si_prefix}
492             raise BadCiphertextHashError(msg)
493
494     def _deliver(self, d, c, result):
495         # this method exists to handle cancel() that occurs between
496         # _got_segment and _deliver
497         if c.active:
498             c.active = False # it is now too late to cancel
499             d.callback(result) # might actually be an errback
500
501     def _extract_requests(self, segnum):
502         """Remove matching requests and return their (d,c) tuples so that the
503         caller can retire them."""
504         retire = [(d,c,seg_ev)
505                   for (segnum0,d,c,seg_ev,lp) in self._segment_requests
506                   if segnum0 == segnum]
507         self._segment_requests = [t for t in self._segment_requests
508                                   if t[0] != segnum]
509         return retire
510
511     def _cancel_request(self, c):
512         self._segment_requests = [t for t in self._segment_requests
513                                   if t[2] != c]
514         segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
515         # self._active_segment might be None in rare circumstances, so make
516         # sure we tolerate it
517         if self._active_segment and self._active_segment.segnum not in segnums:
518             self._active_segment.stop()
519             self._active_segment = None
520             self._start_new_segment()
521
522     # called by ShareFinder to choose hashtree sizes in CommonShares, and by
523     # SegmentFetcher to tell if it is still fetching a valid segnum.
524     def get_num_segments(self):
525         # returns (best_num_segments, authoritative)
526         if self.num_segments is None:
527             return (self.guessed_num_segments, False)
528         return (self.num_segments, True)