]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/node.py
hush current pyflakes warnings (list comprehensions)
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / node.py
1
2 import time
3 now = time.time
4 from zope.interface import Interface
5 from twisted.python.failure import Failure
6 from twisted.internet import defer
7 from foolscap.api import eventually
8 from allmydata import uri
9 from allmydata.codec import CRSDecoder
10 from allmydata.util import base32, log, hashutil, mathutil, observer
11 from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
12 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
13      NotEnoughHashesError
14
15 # local imports
16 from finder import ShareFinder
17 from fetcher import SegmentFetcher
18 from segmentation import Segmentation
19 from common import BadCiphertextHashError
20
21 class IDownloadStatusHandlingConsumer(Interface):
22     def set_download_status_read_event(read_ev):
23         """Record the DownloadStatus 'read event', to be updated with the
24         time it takes to decrypt each chunk of data."""
25
26 class Cancel:
27     def __init__(self, f):
28         self._f = f
29         self.active = True
30     def cancel(self):
31         if self.active:
32             self.active = False
33             self._f(self)
34
35 class DownloadNode:
36     """Internal class which manages downloads and holds state. External
37     callers use CiphertextFileNode instead."""
38
39     # Share._node points to me
40     def __init__(self, verifycap, storage_broker, secret_holder,
41                  terminator, history, download_status):
42         assert isinstance(verifycap, uri.CHKFileVerifierURI)
43         self._verifycap = verifycap
44         self._storage_broker = storage_broker
45         self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
46         self.running = True
47         if terminator:
48             terminator.register(self) # calls self.stop() at stopService()
49         # the rules are:
50         # 1: Only send network requests if you're active (self.running is True)
51         # 2: Use TimerService, not reactor.callLater
52         # 3: You can do eventual-sends any time.
53         # These rules should mean that once
54         # stopService()+flushEventualQueue() fires, everything will be done.
55         self._secret_holder = secret_holder
56         self._history = history
57         self._download_status = download_status
58
59         self.share_hash_tree = IncompleteHashTree(self._verifycap.total_shares)
60
61         # we guess the segment size, so Segmentation can pull non-initial
62         # segments in a single roundtrip. This populates
63         # .guessed_segment_size, .guessed_num_segments, and
64         # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
65         # we'll need)
66         self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
67
68         # filled in when we parse a valid UEB
69         self.have_UEB = False
70         self.segment_size = None
71         self.tail_segment_size = None
72         self.tail_segment_padded = None
73         self.num_segments = None
74         self.block_size = None
75         self.tail_block_size = None
76
77         # things to track callers that want data
78
79         # _segment_requests can have duplicates
80         self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp)
81         self._active_segment = None # a SegmentFetcher, with .segnum
82
83         self._segsize_observers = observer.OneShotObserverList()
84
85         # we create one top-level logparent for this _Node, and another one
86         # for each read() call. Segmentation and get_segment() messages are
87         # associated with the read() call, everything else is tied to the
88         # _Node's log entry.
89         lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
90                      " size=%(size)d,"
91                      " guessed_segsize=%(guessed_segsize)d,"
92                      " guessed_numsegs=%(guessed_numsegs)d",
93                      si=self._si_prefix, size=verifycap.size,
94                      guessed_segsize=self.guessed_segment_size,
95                      guessed_numsegs=self.guessed_num_segments,
96                      level=log.OPERATIONAL, umid="uJ0zAQ")
97         self._lp = lp
98
99         self._sharefinder = ShareFinder(storage_broker, verifycap, self,
100                                         self._download_status, lp)
101         self._shares = set()
102
103     def _build_guessed_tables(self, max_segment_size):
104         size = min(self._verifycap.size, max_segment_size)
105         s = mathutil.next_multiple(size, self._verifycap.needed_shares)
106         self.guessed_segment_size = s
107         r = self._calculate_sizes(self.guessed_segment_size)
108         self.guessed_num_segments = r["num_segments"]
109         # as with CommonShare, our ciphertext_hash_tree is a stub until we
110         # get the real num_segments
111         self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
112         self.ciphertext_hash_tree_leaves = self.guessed_num_segments
113
114     def __repr__(self):
115         return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
116
117     def stop(self):
118         # called by the Terminator at shutdown, mostly for tests
119         if self._active_segment:
120             self._active_segment.stop()
121             self._active_segment = None
122         self._sharefinder.stop()
123
124     # things called by outside callers, via CiphertextFileNode. get_segment()
125     # may also be called by Segmentation.
126
127     def read(self, consumer, offset, size):
128         """I am the main entry point, from which FileNode.read() can get
129         data. I feed the consumer with the desired range of ciphertext. I
130         return a Deferred that fires (with the consumer) when the read is
131         finished.
132
133         Note that there is no notion of a 'file pointer': each call to read()
134         uses an independent offset= value.
135         """
136         # for concurrent operations: each gets its own Segmentation manager
137         if size is None:
138             size = self._verifycap.size
139         # ignore overruns: clip size so offset+size does not go past EOF, and
140         # so size is not negative (which indicates that offset >= EOF)
141         size = max(0, min(size, self._verifycap.size-offset))
142
143         read_ev = self._download_status.add_read_event(offset, size, now())
144         if IDownloadStatusHandlingConsumer.providedBy(consumer):
145             consumer.set_download_status_read_event(read_ev)
146             consumer.set_download_status(self._download_status)
147
148         lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
149                      si=base32.b2a(self._verifycap.storage_index)[:8],
150                      offset=offset, size=size,
151                      level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
152         if self._history:
153             sp = self._history.stats_provider
154             sp.count("downloader.files_downloaded", 1) # really read() calls
155             sp.count("downloader.bytes_downloaded", size)
156         if size == 0:
157             read_ev.finished(now())
158             # no data, so no producer, so no register/unregisterProducer
159             return defer.succeed(consumer)
160
161         # for concurrent operations, each read() gets its own Segmentation
162         # manager
163         s = Segmentation(self, offset, size, consumer, read_ev, lp)
164
165         # this raises an interesting question: what segments to fetch? if
166         # offset=0, always fetch the first segment, and then allow
167         # Segmentation to be responsible for pulling the subsequent ones if
168         # the first wasn't large enough. If offset>0, we're going to need an
169         # extra roundtrip to get the UEB (and therefore the segment size)
170         # before we can figure out which segment to get. TODO: allow the
171         # offset-table-guessing code (which starts by guessing the segsize)
172         # to assist the offset>0 process.
173         d = s.start()
174         def _done(res):
175             read_ev.finished(now())
176             return res
177         d.addBoth(_done)
178         return d
179
180     def get_segment(self, segnum, logparent=None):
181         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
182         Deferred that fires with (offset,data) when the desired segment is
183         available, and c is an object on which c.cancel() can be called to
184         disavow interest in the segment (after which 'd' will never fire).
185
186         You probably need to know the segment size before calling this,
187         unless you want the first few bytes of the file. If you ask for a
188         segment number which turns out to be too large, the Deferred will
189         errback with BadSegmentNumberError.
190
191         The Deferred fires with the offset of the first byte of the data
192         segment, so that you can call get_segment() before knowing the
193         segment size, and still know which data you received.
194
195         The Deferred can also errback with other fatal problems, such as
196         NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
197         """
198         lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
199                      si=base32.b2a(self._verifycap.storage_index)[:8],
200                      segnum=segnum,
201                      level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
202         seg_ev = self._download_status.add_segment_request(segnum, now())
203         d = defer.Deferred()
204         c = Cancel(self._cancel_request)
205         self._segment_requests.append( (segnum, d, c, seg_ev, lp) )
206         self._start_new_segment()
207         return (d, c)
208
209     def get_segsize(self):
210         """Return a Deferred that fires when we know the real segment size."""
211         if self.segment_size:
212             return defer.succeed(self.segment_size)
213         # TODO: this downloads (and discards) the first segment of the file.
214         # We could make this more efficient by writing
215         # fetcher.SegmentSizeFetcher, with the job of finding a single valid
216         # share and extracting the UEB. We'd add Share.get_UEB() to request
217         # just the UEB.
218         (d,c) = self.get_segment(0)
219         # this ensures that an error during get_segment() will errback the
220         # caller, so Repair won't wait forever on completely missing files
221         d.addCallback(lambda ign: self._segsize_observers.when_fired())
222         return d
223
224     # things called by the Segmentation object used to transform
225     # arbitrary-sized read() calls into quantized segment fetches
226
227     def _start_new_segment(self):
228         if self._active_segment is None and self._segment_requests:
229             (segnum, d, c, seg_ev, lp) = self._segment_requests[0]
230             k = self._verifycap.needed_shares
231             log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
232                     node=repr(self), segnum=segnum,
233                     level=log.NOISY, parent=lp, umid="wAlnHQ")
234             self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
235             seg_ev.activate(now())
236             active_shares = [s for s in self._shares if s.is_alive()]
237             fetcher.add_shares(active_shares) # this triggers the loop
238
239
240     # called by our child ShareFinder
241     def got_shares(self, shares):
242         self._shares.update(shares)
243         if self._active_segment:
244             self._active_segment.add_shares(shares)
245     def no_more_shares(self):
246         self._no_more_shares = True
247         if self._active_segment:
248             self._active_segment.no_more_shares()
249
250     # things called by our Share instances
251
252     def validate_and_store_UEB(self, UEB_s):
253         log.msg("validate_and_store_UEB",
254                 level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
255         h = hashutil.uri_extension_hash(UEB_s)
256         if h != self._verifycap.uri_extension_hash:
257             raise BadHashError
258         self._parse_and_store_UEB(UEB_s) # sets self._stuff
259         # TODO: a malformed (but authentic) UEB could throw an assertion in
260         # _parse_and_store_UEB, and we should abandon the download.
261         self.have_UEB = True
262
263         # inform the ShareFinder about our correct number of segments. This
264         # will update the block-hash-trees in all existing CommonShare
265         # instances, and will populate new ones with the correct value.
266         self._sharefinder.update_num_segments()
267
268     def _parse_and_store_UEB(self, UEB_s):
269         # Note: the UEB contains needed_shares and total_shares. These are
270         # redundant and inferior (the filecap contains the authoritative
271         # values). However, because it is possible to encode the same file in
272         # multiple ways, and the encoders might choose (poorly) to use the
273         # same key for both (therefore getting the same SI), we might
274         # encounter shares for both types. The UEB hashes will be different,
275         # however, and we'll disregard the "other" encoding's shares as
276         # corrupted.
277
278         # therefore, we ignore d['total_shares'] and d['needed_shares'].
279
280         d = uri.unpack_extension(UEB_s)
281
282         log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
283                 ueb=repr(uri.unpack_extension_readable(UEB_s)),
284                 vcap=self._verifycap.to_string(),
285                 level=log.NOISY, parent=self._lp, umid="cVqZnA")
286
287         k, N = self._verifycap.needed_shares, self._verifycap.total_shares
288
289         self.segment_size = d['segment_size']
290         self._segsize_observers.fire(self.segment_size)
291
292         r = self._calculate_sizes(self.segment_size)
293         self.tail_segment_size = r["tail_segment_size"]
294         self.tail_segment_padded = r["tail_segment_padded"]
295         self.num_segments = r["num_segments"]
296         self.block_size = r["block_size"]
297         self.tail_block_size = r["tail_block_size"]
298         log.msg("actual sizes: %s" % (r,),
299                 level=log.NOISY, parent=self._lp, umid="PY6P5Q")
300         if (self.segment_size == self.guessed_segment_size
301             and self.num_segments == self.guessed_num_segments):
302             log.msg("my guess was right!",
303                     level=log.NOISY, parent=self._lp, umid="x340Ow")
304         else:
305             log.msg("my guess was wrong! Extra round trips for me.",
306                     level=log.NOISY, parent=self._lp, umid="tb7RJw")
307
308         # zfec.Decode() instantiation is fast, but still, let's use the same
309         # codec instance for all but the last segment. 3-of-10 takes 15us on
310         # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
311         # 2.5ms, worst-case 254-of-255 is 9.3ms
312         self._codec = CRSDecoder()
313         self._codec.set_params(self.segment_size, k, N)
314
315
316         # Ciphertext hash tree root is mandatory, so that there is at most
317         # one ciphertext that matches this read-cap or verify-cap. The
318         # integrity check on the shares is not sufficient to prevent the
319         # original encoder from creating some shares of file A and other
320         # shares of file B. self.ciphertext_hash_tree was a guess before:
321         # this is where we create it for real.
322         self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
323         self.ciphertext_hash_tree_leaves = self.num_segments
324         self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
325
326         self.share_hash_tree.set_hashes({0: d['share_root_hash']})
327
328         # Our job is a fast download, not verification, so we ignore any
329         # redundant fields. The Verifier uses a different code path which
330         # does not ignore them.
331
332     def _calculate_sizes(self, segment_size):
333         # segments of ciphertext
334         size = self._verifycap.size
335         k = self._verifycap.needed_shares
336
337         # this assert matches the one in encode.py:127 inside
338         # Encoded._got_all_encoding_parameters, where the UEB is constructed
339         assert segment_size % k == 0
340
341         # the last segment is usually short. We don't store a whole segsize,
342         # but we do pad the segment up to a multiple of k, because the
343         # encoder requires that.
344         tail_segment_size = size % segment_size
345         if tail_segment_size == 0:
346             tail_segment_size = segment_size
347         padded = mathutil.next_multiple(tail_segment_size, k)
348         tail_segment_padded = padded
349
350         num_segments = mathutil.div_ceil(size, segment_size)
351
352         # each segment is turned into N blocks. All but the last are of size
353         # block_size, and the last is of size tail_block_size
354         block_size = segment_size / k
355         tail_block_size = tail_segment_padded / k
356
357         return { "tail_segment_size": tail_segment_size,
358                  "tail_segment_padded": tail_segment_padded,
359                  "num_segments": num_segments,
360                  "block_size": block_size,
361                  "tail_block_size": tail_block_size,
362                  }
363
364
365     def process_share_hashes(self, share_hashes):
366         for hashnum in share_hashes:
367             if hashnum >= len(self.share_hash_tree):
368                 # "BadHashError" is normally for e.g. a corrupt block. We
369                 # sort of abuse it here to mean a badly numbered hash (which
370                 # indicates corruption in the number bytes, rather than in
371                 # the data bytes).
372                 raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
373                                    % (hashnum, len(self.share_hash_tree)))
374         self.share_hash_tree.set_hashes(share_hashes)
375
376     def get_desired_ciphertext_hashes(self, segnum):
377         if segnum < self.ciphertext_hash_tree_leaves:
378             return self.ciphertext_hash_tree.needed_hashes(segnum,
379                                                            include_leaf=True)
380         return []
381     def get_needed_ciphertext_hashes(self, segnum):
382         cht = self.ciphertext_hash_tree
383         return cht.needed_hashes(segnum, include_leaf=True)
384
385     def process_ciphertext_hashes(self, hashes):
386         assert self.num_segments is not None
387         # this may raise BadHashError or NotEnoughHashesError
388         self.ciphertext_hash_tree.set_hashes(hashes)
389
390
391     # called by our child SegmentFetcher
392
393     def want_more_shares(self):
394         self._sharefinder.hungry()
395
396     def fetch_failed(self, sf, f):
397         assert sf is self._active_segment
398         # deliver error upwards
399         for (d,c,seg_ev) in self._extract_requests(sf.segnum):
400             seg_ev.error(now())
401             eventually(self._deliver, d, c, f)
402         self._active_segment = None
403         self._start_new_segment()
404
405     def process_blocks(self, segnum, blocks):
406         start = now()
407         d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
408         d.addCallback(self._check_ciphertext_hash, segnum)
409         def _deliver(result):
410             log.msg(format="delivering segment(%(segnum)d)",
411                     segnum=segnum,
412                     level=log.OPERATIONAL, parent=self._lp,
413                     umid="j60Ojg")
414             when = now()
415             if isinstance(result, Failure):
416                 # this catches failures in decode or ciphertext hash
417                 for (d,c,seg_ev) in self._extract_requests(segnum):
418                     seg_ev.error(when)
419                     eventually(self._deliver, d, c, result)
420             else:
421                 (offset, segment, decodetime) = result
422                 for (d,c,seg_ev) in self._extract_requests(segnum):
423                     # when we have two requests for the same segment, the
424                     # second one will not be "activated" before the data is
425                     # delivered, so to allow the status-reporting code to see
426                     # consistent behavior, we activate them all now. The
427                     # SegmentEvent will ignore duplicate activate() calls.
428                     # Note that this will result in an inaccurate "receive
429                     # speed" for the second request.
430                     seg_ev.activate(when)
431                     seg_ev.deliver(when, offset, len(segment), decodetime)
432                     eventually(self._deliver, d, c, result)
433             self._download_status.add_misc_event("process_block", start, now())
434             self._active_segment = None
435             self._start_new_segment()
436         d.addBoth(_deliver)
437         d.addErrback(log.err, "unhandled error during process_blocks",
438                      level=log.WEIRD, parent=self._lp, umid="MkEsCg")
439
440     def _decode_blocks(self, segnum, blocks):
441         start = now()
442         tail = (segnum == self.num_segments-1)
443         codec = self._codec
444         block_size = self.block_size
445         decoded_size = self.segment_size
446         if tail:
447             # account for the padding in the last segment
448             codec = CRSDecoder()
449             k, N = self._verifycap.needed_shares, self._verifycap.total_shares
450             codec.set_params(self.tail_segment_padded, k, N)
451             block_size = self.tail_block_size
452             decoded_size = self.tail_segment_padded
453
454         shares = []
455         shareids = []
456         for (shareid, share) in blocks.iteritems():
457             assert len(share) == block_size
458             shareids.append(shareid)
459             shares.append(share)
460         del blocks
461
462         d = codec.decode(shares, shareids)   # segment
463         del shares
464         def _process(buffers):
465             decodetime = now() - start
466             segment = "".join(buffers)
467             assert len(segment) == decoded_size
468             del buffers
469             if tail:
470                 segment = segment[:self.tail_segment_size]
471             self._download_status.add_misc_event("decode", start, now())
472             return (segment, decodetime)
473         d.addCallback(_process)
474         return d
475
476     def _check_ciphertext_hash(self, (segment, decodetime), segnum):
477         start = now()
478         assert self._active_segment.segnum == segnum
479         assert self.segment_size is not None
480         offset = segnum * self.segment_size
481
482         h = hashutil.crypttext_segment_hash(segment)
483         try:
484             self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
485             self._download_status.add_misc_event("CThash", start, now())
486             return (offset, segment, decodetime)
487         except (BadHashError, NotEnoughHashesError):
488             format = ("hash failure in ciphertext_hash_tree:"
489                       " segnum=%(segnum)d, SI=%(si)s")
490             log.msg(format=format, segnum=segnum, si=self._si_prefix,
491                     failure=Failure(),
492                     level=log.WEIRD, parent=self._lp, umid="MTwNnw")
493             # this is especially weird, because we made it past the share
494             # hash tree. It implies that we're using the wrong encoding, or
495             # that the uploader deliberately constructed a bad UEB.
496             msg = format % {"segnum": segnum, "si": self._si_prefix}
497             raise BadCiphertextHashError(msg)
498
499     def _deliver(self, d, c, result):
500         # this method exists to handle cancel() that occurs between
501         # _got_segment and _deliver
502         if c.active:
503             c.active = False # it is now too late to cancel
504             d.callback(result) # might actually be an errback
505
506     def _extract_requests(self, segnum):
507         """Remove matching requests and return their (d,c) tuples so that the
508         caller can retire them."""
509         retire = [(d,c,seg_ev)
510                   for (segnum0,d,c,seg_ev,lp) in self._segment_requests
511                   if segnum0 == segnum]
512         self._segment_requests = [t for t in self._segment_requests
513                                   if t[0] != segnum]
514         return retire
515
516     def _cancel_request(self, cancel):
517         self._segment_requests = [t for t in self._segment_requests
518                                   if t[2] != cancel]
519         segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
520         # self._active_segment might be None in rare circumstances, so make
521         # sure we tolerate it
522         if self._active_segment and self._active_segment.segnum not in segnums:
523             self._active_segment.stop()
524             self._active_segment = None
525             self._start_new_segment()
526
527     # called by ShareFinder to choose hashtree sizes in CommonShares, and by
528     # SegmentFetcher to tell if it is still fetching a valid segnum.
529     def get_num_segments(self):
530         # returns (best_num_segments, authoritative)
531         if self.num_segments is None:
532             return (self.guessed_num_segments, False)
533         return (self.num_segments, True)