4 from zope.interface import Interface
5 from twisted.python.failure import Failure
6 from twisted.internet import defer
7 from foolscap.api import eventually
8 from allmydata import uri
9 from allmydata.codec import CRSDecoder
10 from allmydata.util import base32, log, hashutil, mathutil, observer
11 from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
12 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
16 from finder import ShareFinder
17 from fetcher import SegmentFetcher
18 from segmentation import Segmentation
19 from common import BadCiphertextHashError
21 class IDownloadStatusHandlingConsumer(Interface):
22 def set_download_status_read_event(read_ev):
23 """Record the DownloadStatus 'read event', to be updated with the
24 time it takes to decrypt each chunk of data."""
27 def __init__(self, f):
36 """Internal class which manages downloads and holds state. External
37 callers use CiphertextFileNode instead."""
39 # Share._node points to me
40 def __init__(self, verifycap, storage_broker, secret_holder,
41 terminator, history, download_status):
42 assert isinstance(verifycap, uri.CHKFileVerifierURI)
43 self._verifycap = verifycap
44 self._storage_broker = storage_broker
45 self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
48 terminator.register(self) # calls self.stop() at stopService()
50 # 1: Only send network requests if you're active (self.running is True)
51 # 2: Use TimerService, not reactor.callLater
52 # 3: You can do eventual-sends any time.
53 # These rules should mean that once
54 # stopService()+flushEventualQueue() fires, everything will be done.
55 self._secret_holder = secret_holder
56 self._history = history
57 self._download_status = download_status
59 self.share_hash_tree = IncompleteHashTree(self._verifycap.total_shares)
61 # we guess the segment size, so Segmentation can pull non-initial
62 # segments in a single roundtrip. This populates
63 # .guessed_segment_size, .guessed_num_segments, and
64 # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
66 self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
68 # filled in when we parse a valid UEB
70 self.segment_size = None
71 self.tail_segment_size = None
72 self.tail_segment_padded = None
73 self.num_segments = None
74 self.block_size = None
75 self.tail_block_size = None
77 # things to track callers that want data
79 # _segment_requests can have duplicates
80 self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp)
81 self._active_segment = None # a SegmentFetcher, with .segnum
83 self._segsize_observers = observer.OneShotObserverList()
85 # we create one top-level logparent for this _Node, and another one
86 # for each read() call. Segmentation and get_segment() messages are
87 # associated with the read() call, everything else is tied to the
89 lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
91 " guessed_segsize=%(guessed_segsize)d,"
92 " guessed_numsegs=%(guessed_numsegs)d",
93 si=self._si_prefix, size=verifycap.size,
94 guessed_segsize=self.guessed_segment_size,
95 guessed_numsegs=self.guessed_num_segments,
96 level=log.OPERATIONAL, umid="uJ0zAQ")
99 self._sharefinder = ShareFinder(storage_broker, verifycap, self,
100 self._download_status, lp)
103 def _build_guessed_tables(self, max_segment_size):
104 size = min(self._verifycap.size, max_segment_size)
105 s = mathutil.next_multiple(size, self._verifycap.needed_shares)
106 self.guessed_segment_size = s
107 r = self._calculate_sizes(self.guessed_segment_size)
108 self.guessed_num_segments = r["num_segments"]
109 # as with CommonShare, our ciphertext_hash_tree is a stub until we
110 # get the real num_segments
111 self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
112 self.ciphertext_hash_tree_leaves = self.guessed_num_segments
115 return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
118 # called by the Terminator at shutdown, mostly for tests
119 if self._active_segment:
120 self._active_segment.stop()
121 self._active_segment = None
122 self._sharefinder.stop()
124 # things called by outside callers, via CiphertextFileNode. get_segment()
125 # may also be called by Segmentation.
127 def read(self, consumer, offset, size):
128 """I am the main entry point, from which FileNode.read() can get
129 data. I feed the consumer with the desired range of ciphertext. I
130 return a Deferred that fires (with the consumer) when the read is
133 Note that there is no notion of a 'file pointer': each call to read()
134 uses an independent offset= value.
136 # for concurrent operations: each gets its own Segmentation manager
138 size = self._verifycap.size
139 # ignore overruns: clip size so offset+size does not go past EOF, and
140 # so size is not negative (which indicates that offset >= EOF)
141 size = max(0, min(size, self._verifycap.size-offset))
143 read_ev = self._download_status.add_read_event(offset, size, now())
144 if IDownloadStatusHandlingConsumer.providedBy(consumer):
145 consumer.set_download_status_read_event(read_ev)
146 consumer.set_download_status(self._download_status)
148 lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
149 si=base32.b2a(self._verifycap.storage_index)[:8],
150 offset=offset, size=size,
151 level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
153 sp = self._history.stats_provider
154 sp.count("downloader.files_downloaded", 1) # really read() calls
155 sp.count("downloader.bytes_downloaded", size)
157 read_ev.finished(now())
158 # no data, so no producer, so no register/unregisterProducer
159 return defer.succeed(consumer)
161 # for concurrent operations, each read() gets its own Segmentation
163 s = Segmentation(self, offset, size, consumer, read_ev, lp)
165 # this raises an interesting question: what segments to fetch? if
166 # offset=0, always fetch the first segment, and then allow
167 # Segmentation to be responsible for pulling the subsequent ones if
168 # the first wasn't large enough. If offset>0, we're going to need an
169 # extra roundtrip to get the UEB (and therefore the segment size)
170 # before we can figure out which segment to get. TODO: allow the
171 # offset-table-guessing code (which starts by guessing the segsize)
172 # to assist the offset>0 process.
175 read_ev.finished(now())
180 def get_segment(self, segnum, logparent=None):
181 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
182 Deferred that fires with (offset,data) when the desired segment is
183 available, and c is an object on which c.cancel() can be called to
184 disavow interest in the segment (after which 'd' will never fire).
186 You probably need to know the segment size before calling this,
187 unless you want the first few bytes of the file. If you ask for a
188 segment number which turns out to be too large, the Deferred will
189 errback with BadSegmentNumberError.
191 The Deferred fires with the offset of the first byte of the data
192 segment, so that you can call get_segment() before knowing the
193 segment size, and still know which data you received.
195 The Deferred can also errback with other fatal problems, such as
196 NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
198 lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
199 si=base32.b2a(self._verifycap.storage_index)[:8],
201 level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
202 seg_ev = self._download_status.add_segment_request(segnum, now())
204 c = Cancel(self._cancel_request)
205 self._segment_requests.append( (segnum, d, c, seg_ev, lp) )
206 self._start_new_segment()
209 def get_segsize(self):
210 """Return a Deferred that fires when we know the real segment size."""
211 if self.segment_size:
212 return defer.succeed(self.segment_size)
213 # TODO: this downloads (and discards) the first segment of the file.
214 # We could make this more efficient by writing
215 # fetcher.SegmentSizeFetcher, with the job of finding a single valid
216 # share and extracting the UEB. We'd add Share.get_UEB() to request
218 (d,c) = self.get_segment(0)
219 # this ensures that an error during get_segment() will errback the
220 # caller, so Repair won't wait forever on completely missing files
221 d.addCallback(lambda ign: self._segsize_observers.when_fired())
224 # things called by the Segmentation object used to transform
225 # arbitrary-sized read() calls into quantized segment fetches
227 def _start_new_segment(self):
228 if self._active_segment is None and self._segment_requests:
229 (segnum, d, c, seg_ev, lp) = self._segment_requests[0]
230 k = self._verifycap.needed_shares
231 log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
232 node=repr(self), segnum=segnum,
233 level=log.NOISY, parent=lp, umid="wAlnHQ")
234 self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
235 seg_ev.activate(now())
236 active_shares = [s for s in self._shares if s.is_alive()]
237 fetcher.add_shares(active_shares) # this triggers the loop
240 # called by our child ShareFinder
241 def got_shares(self, shares):
242 self._shares.update(shares)
243 if self._active_segment:
244 self._active_segment.add_shares(shares)
245 def no_more_shares(self):
246 self._no_more_shares = True
247 if self._active_segment:
248 self._active_segment.no_more_shares()
250 # things called by our Share instances
252 def validate_and_store_UEB(self, UEB_s):
253 log.msg("validate_and_store_UEB",
254 level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
255 h = hashutil.uri_extension_hash(UEB_s)
256 if h != self._verifycap.uri_extension_hash:
258 self._parse_and_store_UEB(UEB_s) # sets self._stuff
259 # TODO: a malformed (but authentic) UEB could throw an assertion in
260 # _parse_and_store_UEB, and we should abandon the download.
263 # inform the ShareFinder about our correct number of segments. This
264 # will update the block-hash-trees in all existing CommonShare
265 # instances, and will populate new ones with the correct value.
266 self._sharefinder.update_num_segments()
268 def _parse_and_store_UEB(self, UEB_s):
269 # Note: the UEB contains needed_shares and total_shares. These are
270 # redundant and inferior (the filecap contains the authoritative
271 # values). However, because it is possible to encode the same file in
272 # multiple ways, and the encoders might choose (poorly) to use the
273 # same key for both (therefore getting the same SI), we might
274 # encounter shares for both types. The UEB hashes will be different,
275 # however, and we'll disregard the "other" encoding's shares as
278 # therefore, we ignore d['total_shares'] and d['needed_shares'].
280 d = uri.unpack_extension(UEB_s)
282 log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
283 ueb=repr(uri.unpack_extension_readable(UEB_s)),
284 vcap=self._verifycap.to_string(),
285 level=log.NOISY, parent=self._lp, umid="cVqZnA")
287 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
289 self.segment_size = d['segment_size']
290 self._segsize_observers.fire(self.segment_size)
292 r = self._calculate_sizes(self.segment_size)
293 self.tail_segment_size = r["tail_segment_size"]
294 self.tail_segment_padded = r["tail_segment_padded"]
295 self.num_segments = r["num_segments"]
296 self.block_size = r["block_size"]
297 self.tail_block_size = r["tail_block_size"]
298 log.msg("actual sizes: %s" % (r,),
299 level=log.NOISY, parent=self._lp, umid="PY6P5Q")
300 if (self.segment_size == self.guessed_segment_size
301 and self.num_segments == self.guessed_num_segments):
302 log.msg("my guess was right!",
303 level=log.NOISY, parent=self._lp, umid="x340Ow")
305 log.msg("my guess was wrong! Extra round trips for me.",
306 level=log.NOISY, parent=self._lp, umid="tb7RJw")
308 # zfec.Decode() instantiation is fast, but still, let's use the same
309 # codec instance for all but the last segment. 3-of-10 takes 15us on
310 # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
311 # 2.5ms, worst-case 254-of-255 is 9.3ms
312 self._codec = CRSDecoder()
313 self._codec.set_params(self.segment_size, k, N)
316 # Ciphertext hash tree root is mandatory, so that there is at most
317 # one ciphertext that matches this read-cap or verify-cap. The
318 # integrity check on the shares is not sufficient to prevent the
319 # original encoder from creating some shares of file A and other
320 # shares of file B. self.ciphertext_hash_tree was a guess before:
321 # this is where we create it for real.
322 self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
323 self.ciphertext_hash_tree_leaves = self.num_segments
324 self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
326 self.share_hash_tree.set_hashes({0: d['share_root_hash']})
328 # Our job is a fast download, not verification, so we ignore any
329 # redundant fields. The Verifier uses a different code path which
330 # does not ignore them.
332 def _calculate_sizes(self, segment_size):
333 # segments of ciphertext
334 size = self._verifycap.size
335 k = self._verifycap.needed_shares
337 # this assert matches the one in encode.py:127 inside
338 # Encoded._got_all_encoding_parameters, where the UEB is constructed
339 assert segment_size % k == 0
341 # the last segment is usually short. We don't store a whole segsize,
342 # but we do pad the segment up to a multiple of k, because the
343 # encoder requires that.
344 tail_segment_size = size % segment_size
345 if tail_segment_size == 0:
346 tail_segment_size = segment_size
347 padded = mathutil.next_multiple(tail_segment_size, k)
348 tail_segment_padded = padded
350 num_segments = mathutil.div_ceil(size, segment_size)
352 # each segment is turned into N blocks. All but the last are of size
353 # block_size, and the last is of size tail_block_size
354 block_size = segment_size / k
355 tail_block_size = tail_segment_padded / k
357 return { "tail_segment_size": tail_segment_size,
358 "tail_segment_padded": tail_segment_padded,
359 "num_segments": num_segments,
360 "block_size": block_size,
361 "tail_block_size": tail_block_size,
365 def process_share_hashes(self, share_hashes):
366 for hashnum in share_hashes:
367 if hashnum >= len(self.share_hash_tree):
368 # "BadHashError" is normally for e.g. a corrupt block. We
369 # sort of abuse it here to mean a badly numbered hash (which
370 # indicates corruption in the number bytes, rather than in
372 raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
373 % (hashnum, len(self.share_hash_tree)))
374 self.share_hash_tree.set_hashes(share_hashes)
376 def get_desired_ciphertext_hashes(self, segnum):
377 if segnum < self.ciphertext_hash_tree_leaves:
378 return self.ciphertext_hash_tree.needed_hashes(segnum,
381 def get_needed_ciphertext_hashes(self, segnum):
382 cht = self.ciphertext_hash_tree
383 return cht.needed_hashes(segnum, include_leaf=True)
385 def process_ciphertext_hashes(self, hashes):
386 assert self.num_segments is not None
387 # this may raise BadHashError or NotEnoughHashesError
388 self.ciphertext_hash_tree.set_hashes(hashes)
391 # called by our child SegmentFetcher
393 def want_more_shares(self):
394 self._sharefinder.hungry()
396 def fetch_failed(self, sf, f):
397 assert sf is self._active_segment
398 # deliver error upwards
399 for (d,c,seg_ev) in self._extract_requests(sf.segnum):
401 eventually(self._deliver, d, c, f)
402 self._active_segment = None
403 self._start_new_segment()
405 def process_blocks(self, segnum, blocks):
407 d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
408 d.addCallback(self._check_ciphertext_hash, segnum)
409 def _deliver(result):
410 log.msg(format="delivering segment(%(segnum)d)",
412 level=log.OPERATIONAL, parent=self._lp,
415 if isinstance(result, Failure):
416 # this catches failures in decode or ciphertext hash
417 for (d,c,seg_ev) in self._extract_requests(segnum):
419 eventually(self._deliver, d, c, result)
421 (offset, segment, decodetime) = result
422 for (d,c,seg_ev) in self._extract_requests(segnum):
423 # when we have two requests for the same segment, the
424 # second one will not be "activated" before the data is
425 # delivered, so to allow the status-reporting code to see
426 # consistent behavior, we activate them all now. The
427 # SegmentEvent will ignore duplicate activate() calls.
428 # Note that this will result in an inaccurate "receive
429 # speed" for the second request.
430 seg_ev.activate(when)
431 seg_ev.deliver(when, offset, len(segment), decodetime)
432 eventually(self._deliver, d, c, result)
433 self._download_status.add_misc_event("process_block", start, now())
434 self._active_segment = None
435 self._start_new_segment()
437 d.addErrback(log.err, "unhandled error during process_blocks",
438 level=log.WEIRD, parent=self._lp, umid="MkEsCg")
440 def _decode_blocks(self, segnum, blocks):
442 tail = (segnum == self.num_segments-1)
444 block_size = self.block_size
445 decoded_size = self.segment_size
447 # account for the padding in the last segment
449 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
450 codec.set_params(self.tail_segment_padded, k, N)
451 block_size = self.tail_block_size
452 decoded_size = self.tail_segment_padded
456 for (shareid, share) in blocks.iteritems():
457 assert len(share) == block_size
458 shareids.append(shareid)
462 d = codec.decode(shares, shareids) # segment
464 def _process(buffers):
465 decodetime = now() - start
466 segment = "".join(buffers)
467 assert len(segment) == decoded_size
470 segment = segment[:self.tail_segment_size]
471 self._download_status.add_misc_event("decode", start, now())
472 return (segment, decodetime)
473 d.addCallback(_process)
476 def _check_ciphertext_hash(self, (segment, decodetime), segnum):
478 assert self._active_segment.segnum == segnum
479 assert self.segment_size is not None
480 offset = segnum * self.segment_size
482 h = hashutil.crypttext_segment_hash(segment)
484 self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
485 self._download_status.add_misc_event("CThash", start, now())
486 return (offset, segment, decodetime)
487 except (BadHashError, NotEnoughHashesError):
488 format = ("hash failure in ciphertext_hash_tree:"
489 " segnum=%(segnum)d, SI=%(si)s")
490 log.msg(format=format, segnum=segnum, si=self._si_prefix,
492 level=log.WEIRD, parent=self._lp, umid="MTwNnw")
493 # this is especially weird, because we made it past the share
494 # hash tree. It implies that we're using the wrong encoding, or
495 # that the uploader deliberately constructed a bad UEB.
496 msg = format % {"segnum": segnum, "si": self._si_prefix}
497 raise BadCiphertextHashError(msg)
499 def _deliver(self, d, c, result):
500 # this method exists to handle cancel() that occurs between
501 # _got_segment and _deliver
503 c.active = False # it is now too late to cancel
504 d.callback(result) # might actually be an errback
506 def _extract_requests(self, segnum):
507 """Remove matching requests and return their (d,c) tuples so that the
508 caller can retire them."""
509 retire = [(d,c,seg_ev)
510 for (segnum0,d,c,seg_ev,lp) in self._segment_requests
511 if segnum0 == segnum]
512 self._segment_requests = [t for t in self._segment_requests
516 def _cancel_request(self, cancel):
517 self._segment_requests = [t for t in self._segment_requests
519 segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
520 # self._active_segment might be None in rare circumstances, so make
521 # sure we tolerate it
522 if self._active_segment and self._active_segment.segnum not in segnums:
523 self._active_segment.stop()
524 self._active_segment = None
525 self._start_new_segment()
527 # called by ShareFinder to choose hashtree sizes in CommonShares, and by
528 # SegmentFetcher to tell if it is still fetching a valid segnum.
529 def get_num_segments(self):
530 # returns (best_num_segments, authoritative)
531 if self.num_segments is None:
532 return (self.guessed_num_segments, False)
533 return (self.num_segments, True)