]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/node.py
minor changes to hush newer pyflakes
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / node.py
1
2 import time
3 now = time.time
4 from zope.interface import Interface
5 from twisted.python.failure import Failure
6 from twisted.internet import defer
7 from foolscap.api import eventually
8 from allmydata import uri
9 from allmydata.codec import CRSDecoder
10 from allmydata.util import base32, log, hashutil, mathutil, observer
11 from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
12 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
13      NotEnoughHashesError
14
15 # local imports
16 from finder import ShareFinder
17 from fetcher import SegmentFetcher
18 from segmentation import Segmentation
19 from common import BadCiphertextHashError
20
21 class IDownloadStatusHandlingConsumer(Interface):
22     def set_download_status_read_event(read_ev):
23         """Record the DownloadStatus 'read event', to be updated with the
24         time it takes to decrypt each chunk of data."""
25
26 class Cancel:
27     def __init__(self, f):
28         self._f = f
29         self.active = True
30     def cancel(self):
31         if self.active:
32             self.active = False
33             self._f(self)
34
35 class DownloadNode:
36     """Internal class which manages downloads and holds state. External
37     callers use CiphertextFileNode instead."""
38
39     # Share._node points to me
40     def __init__(self, verifycap, storage_broker, secret_holder,
41                  terminator, history, download_status):
42         assert isinstance(verifycap, uri.CHKFileVerifierURI)
43         self._verifycap = verifycap
44         self._storage_broker = storage_broker
45         self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
46         self.running = True
47         if terminator:
48             terminator.register(self) # calls self.stop() at stopService()
49         # the rules are:
50         # 1: Only send network requests if you're active (self.running is True)
51         # 2: Use TimerService, not reactor.callLater
52         # 3: You can do eventual-sends any time.
53         # These rules should mean that once
54         # stopService()+flushEventualQueue() fires, everything will be done.
55         self._secret_holder = secret_holder
56         self._history = history
57         self._download_status = download_status
58
59         k, N = self._verifycap.needed_shares, self._verifycap.total_shares
60         self.share_hash_tree = IncompleteHashTree(N)
61
62         # we guess the segment size, so Segmentation can pull non-initial
63         # segments in a single roundtrip. This populates
64         # .guessed_segment_size, .guessed_num_segments, and
65         # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
66         # we'll need)
67         self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
68
69         # filled in when we parse a valid UEB
70         self.have_UEB = False
71         self.segment_size = None
72         self.tail_segment_size = None
73         self.tail_segment_padded = None
74         self.num_segments = None
75         self.block_size = None
76         self.tail_block_size = None
77
78         # things to track callers that want data
79
80         # _segment_requests can have duplicates
81         self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp)
82         self._active_segment = None # a SegmentFetcher, with .segnum
83
84         self._segsize_observers = observer.OneShotObserverList()
85
86         # we create one top-level logparent for this _Node, and another one
87         # for each read() call. Segmentation and get_segment() messages are
88         # associated with the read() call, everything else is tied to the
89         # _Node's log entry.
90         lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
91                      " size=%(size)d,"
92                      " guessed_segsize=%(guessed_segsize)d,"
93                      " guessed_numsegs=%(guessed_numsegs)d",
94                      si=self._si_prefix, size=verifycap.size,
95                      guessed_segsize=self.guessed_segment_size,
96                      guessed_numsegs=self.guessed_num_segments,
97                      level=log.OPERATIONAL, umid="uJ0zAQ")
98         self._lp = lp
99
100         self._sharefinder = ShareFinder(storage_broker, verifycap, self,
101                                         self._download_status, lp)
102         self._shares = set()
103
104     def _build_guessed_tables(self, max_segment_size):
105         size = min(self._verifycap.size, max_segment_size)
106         s = mathutil.next_multiple(size, self._verifycap.needed_shares)
107         self.guessed_segment_size = s
108         r = self._calculate_sizes(self.guessed_segment_size)
109         self.guessed_num_segments = r["num_segments"]
110         # as with CommonShare, our ciphertext_hash_tree is a stub until we
111         # get the real num_segments
112         self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
113         self.ciphertext_hash_tree_leaves = self.guessed_num_segments
114
115     def __repr__(self):
116         return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
117
118     def stop(self):
119         # called by the Terminator at shutdown, mostly for tests
120         if self._active_segment:
121             self._active_segment.stop()
122             self._active_segment = None
123         self._sharefinder.stop()
124
125     # things called by outside callers, via CiphertextFileNode. get_segment()
126     # may also be called by Segmentation.
127
128     def read(self, consumer, offset, size):
129         """I am the main entry point, from which FileNode.read() can get
130         data. I feed the consumer with the desired range of ciphertext. I
131         return a Deferred that fires (with the consumer) when the read is
132         finished.
133
134         Note that there is no notion of a 'file pointer': each call to read()
135         uses an independent offset= value.
136         """
137         # for concurrent operations: each gets its own Segmentation manager
138         if size is None:
139             size = self._verifycap.size
140         # ignore overruns: clip size so offset+size does not go past EOF, and
141         # so size is not negative (which indicates that offset >= EOF)
142         size = max(0, min(size, self._verifycap.size-offset))
143
144         read_ev = self._download_status.add_read_event(offset, size, now())
145         if IDownloadStatusHandlingConsumer.providedBy(consumer):
146             consumer.set_download_status_read_event(read_ev)
147             consumer.set_download_status(self._download_status)
148
149         lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
150                      si=base32.b2a(self._verifycap.storage_index)[:8],
151                      offset=offset, size=size,
152                      level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
153         if self._history:
154             sp = self._history.stats_provider
155             sp.count("downloader.files_downloaded", 1) # really read() calls
156             sp.count("downloader.bytes_downloaded", size)
157         if size == 0:
158             read_ev.finished(now())
159             # no data, so no producer, so no register/unregisterProducer
160             return defer.succeed(consumer)
161
162         # for concurrent operations, each read() gets its own Segmentation
163         # manager
164         s = Segmentation(self, offset, size, consumer, read_ev, lp)
165
166         # this raises an interesting question: what segments to fetch? if
167         # offset=0, always fetch the first segment, and then allow
168         # Segmentation to be responsible for pulling the subsequent ones if
169         # the first wasn't large enough. If offset>0, we're going to need an
170         # extra roundtrip to get the UEB (and therefore the segment size)
171         # before we can figure out which segment to get. TODO: allow the
172         # offset-table-guessing code (which starts by guessing the segsize)
173         # to assist the offset>0 process.
174         d = s.start()
175         def _done(res):
176             read_ev.finished(now())
177             return res
178         d.addBoth(_done)
179         return d
180
181     def get_segment(self, segnum, logparent=None):
182         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
183         Deferred that fires with (offset,data) when the desired segment is
184         available, and c is an object on which c.cancel() can be called to
185         disavow interest in the segment (after which 'd' will never fire).
186
187         You probably need to know the segment size before calling this,
188         unless you want the first few bytes of the file. If you ask for a
189         segment number which turns out to be too large, the Deferred will
190         errback with BadSegmentNumberError.
191
192         The Deferred fires with the offset of the first byte of the data
193         segment, so that you can call get_segment() before knowing the
194         segment size, and still know which data you received.
195
196         The Deferred can also errback with other fatal problems, such as
197         NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
198         """
199         lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
200                      si=base32.b2a(self._verifycap.storage_index)[:8],
201                      segnum=segnum,
202                      level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
203         seg_ev = self._download_status.add_segment_request(segnum, now())
204         d = defer.Deferred()
205         c = Cancel(self._cancel_request)
206         self._segment_requests.append( (segnum, d, c, seg_ev, lp) )
207         self._start_new_segment()
208         return (d, c)
209
210     def get_segsize(self):
211         """Return a Deferred that fires when we know the real segment size."""
212         if self.segment_size:
213             return defer.succeed(self.segment_size)
214         # TODO: this downloads (and discards) the first segment of the file.
215         # We could make this more efficient by writing
216         # fetcher.SegmentSizeFetcher, with the job of finding a single valid
217         # share and extracting the UEB. We'd add Share.get_UEB() to request
218         # just the UEB.
219         (d,c) = self.get_segment(0)
220         # this ensures that an error during get_segment() will errback the
221         # caller, so Repair won't wait forever on completely missing files
222         d.addCallback(lambda ign: self._segsize_observers.when_fired())
223         return d
224
225     # things called by the Segmentation object used to transform
226     # arbitrary-sized read() calls into quantized segment fetches
227
228     def _start_new_segment(self):
229         if self._active_segment is None and self._segment_requests:
230             (segnum, d, c, seg_ev, lp) = self._segment_requests[0]
231             k = self._verifycap.needed_shares
232             log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
233                     node=repr(self), segnum=segnum,
234                     level=log.NOISY, parent=lp, umid="wAlnHQ")
235             self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
236             seg_ev.activate(now())
237             active_shares = [s for s in self._shares if s.is_alive()]
238             fetcher.add_shares(active_shares) # this triggers the loop
239
240
241     # called by our child ShareFinder
242     def got_shares(self, shares):
243         self._shares.update(shares)
244         if self._active_segment:
245             self._active_segment.add_shares(shares)
246     def no_more_shares(self):
247         self._no_more_shares = True
248         if self._active_segment:
249             self._active_segment.no_more_shares()
250
251     # things called by our Share instances
252
253     def validate_and_store_UEB(self, UEB_s):
254         log.msg("validate_and_store_UEB",
255                 level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
256         h = hashutil.uri_extension_hash(UEB_s)
257         if h != self._verifycap.uri_extension_hash:
258             raise BadHashError
259         self._parse_and_store_UEB(UEB_s) # sets self._stuff
260         # TODO: a malformed (but authentic) UEB could throw an assertion in
261         # _parse_and_store_UEB, and we should abandon the download.
262         self.have_UEB = True
263
264         # inform the ShareFinder about our correct number of segments. This
265         # will update the block-hash-trees in all existing CommonShare
266         # instances, and will populate new ones with the correct value.
267         self._sharefinder.update_num_segments()
268
269     def _parse_and_store_UEB(self, UEB_s):
270         # Note: the UEB contains needed_shares and total_shares. These are
271         # redundant and inferior (the filecap contains the authoritative
272         # values). However, because it is possible to encode the same file in
273         # multiple ways, and the encoders might choose (poorly) to use the
274         # same key for both (therefore getting the same SI), we might
275         # encounter shares for both types. The UEB hashes will be different,
276         # however, and we'll disregard the "other" encoding's shares as
277         # corrupted.
278
279         # therefore, we ignore d['total_shares'] and d['needed_shares'].
280
281         d = uri.unpack_extension(UEB_s)
282
283         log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
284                 ueb=repr(uri.unpack_extension_readable(UEB_s)),
285                 vcap=self._verifycap.to_string(),
286                 level=log.NOISY, parent=self._lp, umid="cVqZnA")
287
288         k, N = self._verifycap.needed_shares, self._verifycap.total_shares
289
290         self.segment_size = d['segment_size']
291         self._segsize_observers.fire(self.segment_size)
292
293         r = self._calculate_sizes(self.segment_size)
294         self.tail_segment_size = r["tail_segment_size"]
295         self.tail_segment_padded = r["tail_segment_padded"]
296         self.num_segments = r["num_segments"]
297         self.block_size = r["block_size"]
298         self.tail_block_size = r["tail_block_size"]
299         log.msg("actual sizes: %s" % (r,),
300                 level=log.NOISY, parent=self._lp, umid="PY6P5Q")
301         if (self.segment_size == self.guessed_segment_size
302             and self.num_segments == self.guessed_num_segments):
303             log.msg("my guess was right!",
304                     level=log.NOISY, parent=self._lp, umid="x340Ow")
305         else:
306             log.msg("my guess was wrong! Extra round trips for me.",
307                     level=log.NOISY, parent=self._lp, umid="tb7RJw")
308
309         # zfec.Decode() instantiation is fast, but still, let's use the same
310         # codec instance for all but the last segment. 3-of-10 takes 15us on
311         # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
312         # 2.5ms, worst-case 254-of-255 is 9.3ms
313         self._codec = CRSDecoder()
314         self._codec.set_params(self.segment_size, k, N)
315
316
317         # Ciphertext hash tree root is mandatory, so that there is at most
318         # one ciphertext that matches this read-cap or verify-cap. The
319         # integrity check on the shares is not sufficient to prevent the
320         # original encoder from creating some shares of file A and other
321         # shares of file B. self.ciphertext_hash_tree was a guess before:
322         # this is where we create it for real.
323         self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
324         self.ciphertext_hash_tree_leaves = self.num_segments
325         self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
326
327         self.share_hash_tree.set_hashes({0: d['share_root_hash']})
328
329         # Our job is a fast download, not verification, so we ignore any
330         # redundant fields. The Verifier uses a different code path which
331         # does not ignore them.
332
333     def _calculate_sizes(self, segment_size):
334         # segments of ciphertext
335         size = self._verifycap.size
336         k = self._verifycap.needed_shares
337
338         # this assert matches the one in encode.py:127 inside
339         # Encoded._got_all_encoding_parameters, where the UEB is constructed
340         assert segment_size % k == 0
341
342         # the last segment is usually short. We don't store a whole segsize,
343         # but we do pad the segment up to a multiple of k, because the
344         # encoder requires that.
345         tail_segment_size = size % segment_size
346         if tail_segment_size == 0:
347             tail_segment_size = segment_size
348         padded = mathutil.next_multiple(tail_segment_size, k)
349         tail_segment_padded = padded
350
351         num_segments = mathutil.div_ceil(size, segment_size)
352
353         # each segment is turned into N blocks. All but the last are of size
354         # block_size, and the last is of size tail_block_size
355         block_size = segment_size / k
356         tail_block_size = tail_segment_padded / k
357
358         return { "tail_segment_size": tail_segment_size,
359                  "tail_segment_padded": tail_segment_padded,
360                  "num_segments": num_segments,
361                  "block_size": block_size,
362                  "tail_block_size": tail_block_size,
363                  }
364
365
366     def process_share_hashes(self, share_hashes):
367         for hashnum in share_hashes:
368             if hashnum >= len(self.share_hash_tree):
369                 # "BadHashError" is normally for e.g. a corrupt block. We
370                 # sort of abuse it here to mean a badly numbered hash (which
371                 # indicates corruption in the number bytes, rather than in
372                 # the data bytes).
373                 raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
374                                    % (hashnum, len(self.share_hash_tree)))
375         self.share_hash_tree.set_hashes(share_hashes)
376
377     def get_desired_ciphertext_hashes(self, segnum):
378         if segnum < self.ciphertext_hash_tree_leaves:
379             return self.ciphertext_hash_tree.needed_hashes(segnum,
380                                                            include_leaf=True)
381         return []
382     def get_needed_ciphertext_hashes(self, segnum):
383         cht = self.ciphertext_hash_tree
384         return cht.needed_hashes(segnum, include_leaf=True)
385
386     def process_ciphertext_hashes(self, hashes):
387         assert self.num_segments is not None
388         # this may raise BadHashError or NotEnoughHashesError
389         self.ciphertext_hash_tree.set_hashes(hashes)
390
391
392     # called by our child SegmentFetcher
393
394     def want_more_shares(self):
395         self._sharefinder.hungry()
396
397     def fetch_failed(self, sf, f):
398         assert sf is self._active_segment
399         # deliver error upwards
400         for (d,c,seg_ev) in self._extract_requests(sf.segnum):
401             seg_ev.error(now())
402             eventually(self._deliver, d, c, f)
403         self._active_segment = None
404         self._start_new_segment()
405
406     def process_blocks(self, segnum, blocks):
407         start = now()
408         d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
409         d.addCallback(self._check_ciphertext_hash, segnum)
410         def _deliver(result):
411             log.msg(format="delivering segment(%(segnum)d)",
412                     segnum=segnum,
413                     level=log.OPERATIONAL, parent=self._lp,
414                     umid="j60Ojg")
415             when = now()
416             if isinstance(result, Failure):
417                 # this catches failures in decode or ciphertext hash
418                 for (d,c,seg_ev) in self._extract_requests(segnum):
419                     seg_ev.error(when)
420                     eventually(self._deliver, d, c, result)
421             else:
422                 (offset, segment, decodetime) = result
423                 for (d,c,seg_ev) in self._extract_requests(segnum):
424                     # when we have two requests for the same segment, the
425                     # second one will not be "activated" before the data is
426                     # delivered, so to allow the status-reporting code to see
427                     # consistent behavior, we activate them all now. The
428                     # SegmentEvent will ignore duplicate activate() calls.
429                     # Note that this will result in an inaccurate "receive
430                     # speed" for the second request.
431                     seg_ev.activate(when)
432                     seg_ev.deliver(when, offset, len(segment), decodetime)
433                     eventually(self._deliver, d, c, result)
434             self._download_status.add_misc_event("process_block", start, now())
435             self._active_segment = None
436             self._start_new_segment()
437         d.addBoth(_deliver)
438         d.addErrback(log.err, "unhandled error during process_blocks",
439                      level=log.WEIRD, parent=self._lp, umid="MkEsCg")
440
441     def _decode_blocks(self, segnum, blocks):
442         start = now()
443         tail = (segnum == self.num_segments-1)
444         codec = self._codec
445         block_size = self.block_size
446         decoded_size = self.segment_size
447         if tail:
448             # account for the padding in the last segment
449             codec = CRSDecoder()
450             k, N = self._verifycap.needed_shares, self._verifycap.total_shares
451             codec.set_params(self.tail_segment_padded, k, N)
452             block_size = self.tail_block_size
453             decoded_size = self.tail_segment_padded
454
455         shares = []
456         shareids = []
457         for (shareid, share) in blocks.iteritems():
458             assert len(share) == block_size
459             shareids.append(shareid)
460             shares.append(share)
461         del blocks
462
463         d = codec.decode(shares, shareids)   # segment
464         del shares
465         def _process(buffers):
466             decodetime = now() - start
467             segment = "".join(buffers)
468             assert len(segment) == decoded_size
469             del buffers
470             if tail:
471                 segment = segment[:self.tail_segment_size]
472             self._download_status.add_misc_event("decode", start, now())
473             return (segment, decodetime)
474         d.addCallback(_process)
475         return d
476
477     def _check_ciphertext_hash(self, (segment, decodetime), segnum):
478         start = now()
479         assert self._active_segment.segnum == segnum
480         assert self.segment_size is not None
481         offset = segnum * self.segment_size
482
483         h = hashutil.crypttext_segment_hash(segment)
484         try:
485             self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
486             self._download_status.add_misc_event("CThash", start, now())
487             return (offset, segment, decodetime)
488         except (BadHashError, NotEnoughHashesError):
489             format = ("hash failure in ciphertext_hash_tree:"
490                       " segnum=%(segnum)d, SI=%(si)s")
491             log.msg(format=format, segnum=segnum, si=self._si_prefix,
492                     failure=Failure(),
493                     level=log.WEIRD, parent=self._lp, umid="MTwNnw")
494             # this is especially weird, because we made it past the share
495             # hash tree. It implies that we're using the wrong encoding, or
496             # that the uploader deliberately constructed a bad UEB.
497             msg = format % {"segnum": segnum, "si": self._si_prefix}
498             raise BadCiphertextHashError(msg)
499
500     def _deliver(self, d, c, result):
501         # this method exists to handle cancel() that occurs between
502         # _got_segment and _deliver
503         if c.active:
504             c.active = False # it is now too late to cancel
505             d.callback(result) # might actually be an errback
506
507     def _extract_requests(self, segnum):
508         """Remove matching requests and return their (d,c) tuples so that the
509         caller can retire them."""
510         retire = [(d,c,seg_ev)
511                   for (segnum0,d,c,seg_ev,lp) in self._segment_requests
512                   if segnum0 == segnum]
513         self._segment_requests = [t for t in self._segment_requests
514                                   if t[0] != segnum]
515         return retire
516
517     def _cancel_request(self, cancel):
518         self._segment_requests = [t for t in self._segment_requests
519                                   if t[2] != cancel]
520         segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
521         # self._active_segment might be None in rare circumstances, so make
522         # sure we tolerate it
523         if self._active_segment and self._active_segment.segnum not in segnums:
524             self._active_segment.stop()
525             self._active_segment = None
526             self._start_new_segment()
527
528     # called by ShareFinder to choose hashtree sizes in CommonShares, and by
529     # SegmentFetcher to tell if it is still fetching a valid segnum.
530     def get_num_segments(self):
531         # returns (best_num_segments, authoritative)
532         if self.num_segments is None:
533             return (self.guessed_num_segments, False)
534         return (self.num_segments, True)