4 from zope.interface import Interface
5 from twisted.python.failure import Failure
6 from twisted.internet import defer
7 from foolscap.api import eventually
8 from allmydata import uri
9 from allmydata.codec import CRSDecoder
10 from allmydata.util import base32, log, hashutil, mathutil, observer
11 from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
12 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
16 from finder import ShareFinder
17 from fetcher import SegmentFetcher
18 from segmentation import Segmentation
19 from common import BadCiphertextHashError
21 class IDownloadStatusHandlingConsumer(Interface):
22 def set_download_status_read_event(read_ev):
23 """Record the DownloadStatus 'read event', to be updated with the
24 time it takes to decrypt each chunk of data."""
27 def __init__(self, f):
36 """Internal class which manages downloads and holds state. External
37 callers use CiphertextFileNode instead."""
39 # Share._node points to me
40 def __init__(self, verifycap, storage_broker, secret_holder,
41 terminator, history, download_status):
42 assert isinstance(verifycap, uri.CHKFileVerifierURI)
43 self._verifycap = verifycap
44 self._storage_broker = storage_broker
45 self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
48 terminator.register(self) # calls self.stop() at stopService()
50 # 1: Only send network requests if you're active (self.running is True)
51 # 2: Use TimerService, not reactor.callLater
52 # 3: You can do eventual-sends any time.
53 # These rules should mean that once
54 # stopService()+flushEventualQueue() fires, everything will be done.
55 self._secret_holder = secret_holder
56 self._history = history
57 self._download_status = download_status
59 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
60 self.share_hash_tree = IncompleteHashTree(N)
62 # we guess the segment size, so Segmentation can pull non-initial
63 # segments in a single roundtrip. This populates
64 # .guessed_segment_size, .guessed_num_segments, and
65 # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
67 self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
69 # filled in when we parse a valid UEB
71 self.segment_size = None
72 self.tail_segment_size = None
73 self.tail_segment_padded = None
74 self.num_segments = None
75 self.block_size = None
76 self.tail_block_size = None
78 # things to track callers that want data
80 # _segment_requests can have duplicates
81 self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp)
82 self._active_segment = None # a SegmentFetcher, with .segnum
84 self._segsize_observers = observer.OneShotObserverList()
86 # we create one top-level logparent for this _Node, and another one
87 # for each read() call. Segmentation and get_segment() messages are
88 # associated with the read() call, everything else is tied to the
90 lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
92 " guessed_segsize=%(guessed_segsize)d,"
93 " guessed_numsegs=%(guessed_numsegs)d",
94 si=self._si_prefix, size=verifycap.size,
95 guessed_segsize=self.guessed_segment_size,
96 guessed_numsegs=self.guessed_num_segments,
97 level=log.OPERATIONAL, umid="uJ0zAQ")
100 self._sharefinder = ShareFinder(storage_broker, verifycap, self,
101 self._download_status, lp)
104 def _build_guessed_tables(self, max_segment_size):
105 size = min(self._verifycap.size, max_segment_size)
106 s = mathutil.next_multiple(size, self._verifycap.needed_shares)
107 self.guessed_segment_size = s
108 r = self._calculate_sizes(self.guessed_segment_size)
109 self.guessed_num_segments = r["num_segments"]
110 # as with CommonShare, our ciphertext_hash_tree is a stub until we
111 # get the real num_segments
112 self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
113 self.ciphertext_hash_tree_leaves = self.guessed_num_segments
116 return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
119 # called by the Terminator at shutdown, mostly for tests
120 if self._active_segment:
121 self._active_segment.stop()
122 self._active_segment = None
123 self._sharefinder.stop()
125 # things called by outside callers, via CiphertextFileNode. get_segment()
126 # may also be called by Segmentation.
128 def read(self, consumer, offset, size):
129 """I am the main entry point, from which FileNode.read() can get
130 data. I feed the consumer with the desired range of ciphertext. I
131 return a Deferred that fires (with the consumer) when the read is
134 Note that there is no notion of a 'file pointer': each call to read()
135 uses an independent offset= value.
137 # for concurrent operations: each gets its own Segmentation manager
139 size = self._verifycap.size
140 # ignore overruns: clip size so offset+size does not go past EOF, and
141 # so size is not negative (which indicates that offset >= EOF)
142 size = max(0, min(size, self._verifycap.size-offset))
144 read_ev = self._download_status.add_read_event(offset, size, now())
145 if IDownloadStatusHandlingConsumer.providedBy(consumer):
146 consumer.set_download_status_read_event(read_ev)
147 consumer.set_download_status(self._download_status)
149 lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
150 si=base32.b2a(self._verifycap.storage_index)[:8],
151 offset=offset, size=size,
152 level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
154 sp = self._history.stats_provider
155 sp.count("downloader.files_downloaded", 1) # really read() calls
156 sp.count("downloader.bytes_downloaded", size)
158 read_ev.finished(now())
159 # no data, so no producer, so no register/unregisterProducer
160 return defer.succeed(consumer)
162 # for concurrent operations, each read() gets its own Segmentation
164 s = Segmentation(self, offset, size, consumer, read_ev, lp)
166 # this raises an interesting question: what segments to fetch? if
167 # offset=0, always fetch the first segment, and then allow
168 # Segmentation to be responsible for pulling the subsequent ones if
169 # the first wasn't large enough. If offset>0, we're going to need an
170 # extra roundtrip to get the UEB (and therefore the segment size)
171 # before we can figure out which segment to get. TODO: allow the
172 # offset-table-guessing code (which starts by guessing the segsize)
173 # to assist the offset>0 process.
176 read_ev.finished(now())
181 def get_segment(self, segnum, logparent=None):
182 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
183 Deferred that fires with (offset,data) when the desired segment is
184 available, and c is an object on which c.cancel() can be called to
185 disavow interest in the segment (after which 'd' will never fire).
187 You probably need to know the segment size before calling this,
188 unless you want the first few bytes of the file. If you ask for a
189 segment number which turns out to be too large, the Deferred will
190 errback with BadSegmentNumberError.
192 The Deferred fires with the offset of the first byte of the data
193 segment, so that you can call get_segment() before knowing the
194 segment size, and still know which data you received.
196 The Deferred can also errback with other fatal problems, such as
197 NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
199 lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
200 si=base32.b2a(self._verifycap.storage_index)[:8],
202 level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
203 seg_ev = self._download_status.add_segment_request(segnum, now())
205 c = Cancel(self._cancel_request)
206 self._segment_requests.append( (segnum, d, c, seg_ev, lp) )
207 self._start_new_segment()
210 def get_segsize(self):
211 """Return a Deferred that fires when we know the real segment size."""
212 if self.segment_size:
213 return defer.succeed(self.segment_size)
214 # TODO: this downloads (and discards) the first segment of the file.
215 # We could make this more efficient by writing
216 # fetcher.SegmentSizeFetcher, with the job of finding a single valid
217 # share and extracting the UEB. We'd add Share.get_UEB() to request
219 (d,c) = self.get_segment(0)
220 # this ensures that an error during get_segment() will errback the
221 # caller, so Repair won't wait forever on completely missing files
222 d.addCallback(lambda ign: self._segsize_observers.when_fired())
225 # things called by the Segmentation object used to transform
226 # arbitrary-sized read() calls into quantized segment fetches
228 def _start_new_segment(self):
229 if self._active_segment is None and self._segment_requests:
230 (segnum, d, c, seg_ev, lp) = self._segment_requests[0]
231 k = self._verifycap.needed_shares
232 log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
233 node=repr(self), segnum=segnum,
234 level=log.NOISY, parent=lp, umid="wAlnHQ")
235 self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
236 seg_ev.activate(now())
237 active_shares = [s for s in self._shares if s.is_alive()]
238 fetcher.add_shares(active_shares) # this triggers the loop
241 # called by our child ShareFinder
242 def got_shares(self, shares):
243 self._shares.update(shares)
244 if self._active_segment:
245 self._active_segment.add_shares(shares)
246 def no_more_shares(self):
247 self._no_more_shares = True
248 if self._active_segment:
249 self._active_segment.no_more_shares()
251 # things called by our Share instances
253 def validate_and_store_UEB(self, UEB_s):
254 log.msg("validate_and_store_UEB",
255 level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
256 h = hashutil.uri_extension_hash(UEB_s)
257 if h != self._verifycap.uri_extension_hash:
259 self._parse_and_store_UEB(UEB_s) # sets self._stuff
260 # TODO: a malformed (but authentic) UEB could throw an assertion in
261 # _parse_and_store_UEB, and we should abandon the download.
264 # inform the ShareFinder about our correct number of segments. This
265 # will update the block-hash-trees in all existing CommonShare
266 # instances, and will populate new ones with the correct value.
267 self._sharefinder.update_num_segments()
269 def _parse_and_store_UEB(self, UEB_s):
270 # Note: the UEB contains needed_shares and total_shares. These are
271 # redundant and inferior (the filecap contains the authoritative
272 # values). However, because it is possible to encode the same file in
273 # multiple ways, and the encoders might choose (poorly) to use the
274 # same key for both (therefore getting the same SI), we might
275 # encounter shares for both types. The UEB hashes will be different,
276 # however, and we'll disregard the "other" encoding's shares as
279 # therefore, we ignore d['total_shares'] and d['needed_shares'].
281 d = uri.unpack_extension(UEB_s)
283 log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
284 ueb=repr(uri.unpack_extension_readable(UEB_s)),
285 vcap=self._verifycap.to_string(),
286 level=log.NOISY, parent=self._lp, umid="cVqZnA")
288 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
290 self.segment_size = d['segment_size']
291 self._segsize_observers.fire(self.segment_size)
293 r = self._calculate_sizes(self.segment_size)
294 self.tail_segment_size = r["tail_segment_size"]
295 self.tail_segment_padded = r["tail_segment_padded"]
296 self.num_segments = r["num_segments"]
297 self.block_size = r["block_size"]
298 self.tail_block_size = r["tail_block_size"]
299 log.msg("actual sizes: %s" % (r,),
300 level=log.NOISY, parent=self._lp, umid="PY6P5Q")
301 if (self.segment_size == self.guessed_segment_size
302 and self.num_segments == self.guessed_num_segments):
303 log.msg("my guess was right!",
304 level=log.NOISY, parent=self._lp, umid="x340Ow")
306 log.msg("my guess was wrong! Extra round trips for me.",
307 level=log.NOISY, parent=self._lp, umid="tb7RJw")
309 # zfec.Decode() instantiation is fast, but still, let's use the same
310 # codec instance for all but the last segment. 3-of-10 takes 15us on
311 # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
312 # 2.5ms, worst-case 254-of-255 is 9.3ms
313 self._codec = CRSDecoder()
314 self._codec.set_params(self.segment_size, k, N)
317 # Ciphertext hash tree root is mandatory, so that there is at most
318 # one ciphertext that matches this read-cap or verify-cap. The
319 # integrity check on the shares is not sufficient to prevent the
320 # original encoder from creating some shares of file A and other
321 # shares of file B. self.ciphertext_hash_tree was a guess before:
322 # this is where we create it for real.
323 self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
324 self.ciphertext_hash_tree_leaves = self.num_segments
325 self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
327 self.share_hash_tree.set_hashes({0: d['share_root_hash']})
329 # Our job is a fast download, not verification, so we ignore any
330 # redundant fields. The Verifier uses a different code path which
331 # does not ignore them.
333 def _calculate_sizes(self, segment_size):
334 # segments of ciphertext
335 size = self._verifycap.size
336 k = self._verifycap.needed_shares
338 # this assert matches the one in encode.py:127 inside
339 # Encoded._got_all_encoding_parameters, where the UEB is constructed
340 assert segment_size % k == 0
342 # the last segment is usually short. We don't store a whole segsize,
343 # but we do pad the segment up to a multiple of k, because the
344 # encoder requires that.
345 tail_segment_size = size % segment_size
346 if tail_segment_size == 0:
347 tail_segment_size = segment_size
348 padded = mathutil.next_multiple(tail_segment_size, k)
349 tail_segment_padded = padded
351 num_segments = mathutil.div_ceil(size, segment_size)
353 # each segment is turned into N blocks. All but the last are of size
354 # block_size, and the last is of size tail_block_size
355 block_size = segment_size / k
356 tail_block_size = tail_segment_padded / k
358 return { "tail_segment_size": tail_segment_size,
359 "tail_segment_padded": tail_segment_padded,
360 "num_segments": num_segments,
361 "block_size": block_size,
362 "tail_block_size": tail_block_size,
366 def process_share_hashes(self, share_hashes):
367 for hashnum in share_hashes:
368 if hashnum >= len(self.share_hash_tree):
369 # "BadHashError" is normally for e.g. a corrupt block. We
370 # sort of abuse it here to mean a badly numbered hash (which
371 # indicates corruption in the number bytes, rather than in
373 raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
374 % (hashnum, len(self.share_hash_tree)))
375 self.share_hash_tree.set_hashes(share_hashes)
377 def get_desired_ciphertext_hashes(self, segnum):
378 if segnum < self.ciphertext_hash_tree_leaves:
379 return self.ciphertext_hash_tree.needed_hashes(segnum,
382 def get_needed_ciphertext_hashes(self, segnum):
383 cht = self.ciphertext_hash_tree
384 return cht.needed_hashes(segnum, include_leaf=True)
386 def process_ciphertext_hashes(self, hashes):
387 assert self.num_segments is not None
388 # this may raise BadHashError or NotEnoughHashesError
389 self.ciphertext_hash_tree.set_hashes(hashes)
392 # called by our child SegmentFetcher
394 def want_more_shares(self):
395 self._sharefinder.hungry()
397 def fetch_failed(self, sf, f):
398 assert sf is self._active_segment
399 # deliver error upwards
400 for (d,c,seg_ev) in self._extract_requests(sf.segnum):
402 eventually(self._deliver, d, c, f)
403 self._active_segment = None
404 self._start_new_segment()
406 def process_blocks(self, segnum, blocks):
408 d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
409 d.addCallback(self._check_ciphertext_hash, segnum)
410 def _deliver(result):
411 log.msg(format="delivering segment(%(segnum)d)",
413 level=log.OPERATIONAL, parent=self._lp,
416 if isinstance(result, Failure):
417 # this catches failures in decode or ciphertext hash
418 for (d,c,seg_ev) in self._extract_requests(segnum):
420 eventually(self._deliver, d, c, result)
422 (offset, segment, decodetime) = result
423 for (d,c,seg_ev) in self._extract_requests(segnum):
424 # when we have two requests for the same segment, the
425 # second one will not be "activated" before the data is
426 # delivered, so to allow the status-reporting code to see
427 # consistent behavior, we activate them all now. The
428 # SegmentEvent will ignore duplicate activate() calls.
429 # Note that this will result in an inaccurate "receive
430 # speed" for the second request.
431 seg_ev.activate(when)
432 seg_ev.deliver(when, offset, len(segment), decodetime)
433 eventually(self._deliver, d, c, result)
434 self._download_status.add_misc_event("process_block", start, now())
435 self._active_segment = None
436 self._start_new_segment()
438 d.addErrback(log.err, "unhandled error during process_blocks",
439 level=log.WEIRD, parent=self._lp, umid="MkEsCg")
441 def _decode_blocks(self, segnum, blocks):
443 tail = (segnum == self.num_segments-1)
445 block_size = self.block_size
446 decoded_size = self.segment_size
448 # account for the padding in the last segment
450 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
451 codec.set_params(self.tail_segment_padded, k, N)
452 block_size = self.tail_block_size
453 decoded_size = self.tail_segment_padded
457 for (shareid, share) in blocks.iteritems():
458 assert len(share) == block_size
459 shareids.append(shareid)
463 d = codec.decode(shares, shareids) # segment
465 def _process(buffers):
466 decodetime = now() - start
467 segment = "".join(buffers)
468 assert len(segment) == decoded_size
471 segment = segment[:self.tail_segment_size]
472 self._download_status.add_misc_event("decode", start, now())
473 return (segment, decodetime)
474 d.addCallback(_process)
477 def _check_ciphertext_hash(self, (segment, decodetime), segnum):
479 assert self._active_segment.segnum == segnum
480 assert self.segment_size is not None
481 offset = segnum * self.segment_size
483 h = hashutil.crypttext_segment_hash(segment)
485 self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
486 self._download_status.add_misc_event("CThash", start, now())
487 return (offset, segment, decodetime)
488 except (BadHashError, NotEnoughHashesError):
489 format = ("hash failure in ciphertext_hash_tree:"
490 " segnum=%(segnum)d, SI=%(si)s")
491 log.msg(format=format, segnum=segnum, si=self._si_prefix,
493 level=log.WEIRD, parent=self._lp, umid="MTwNnw")
494 # this is especially weird, because we made it past the share
495 # hash tree. It implies that we're using the wrong encoding, or
496 # that the uploader deliberately constructed a bad UEB.
497 msg = format % {"segnum": segnum, "si": self._si_prefix}
498 raise BadCiphertextHashError(msg)
500 def _deliver(self, d, c, result):
501 # this method exists to handle cancel() that occurs between
502 # _got_segment and _deliver
504 c.active = False # it is now too late to cancel
505 d.callback(result) # might actually be an errback
507 def _extract_requests(self, segnum):
508 """Remove matching requests and return their (d,c) tuples so that the
509 caller can retire them."""
510 retire = [(d,c,seg_ev)
511 for (segnum0,d,c,seg_ev,lp) in self._segment_requests
512 if segnum0 == segnum]
513 self._segment_requests = [t for t in self._segment_requests
517 def _cancel_request(self, cancel):
518 self._segment_requests = [t for t in self._segment_requests
520 segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
521 # self._active_segment might be None in rare circumstances, so make
522 # sure we tolerate it
523 if self._active_segment and self._active_segment.segnum not in segnums:
524 self._active_segment.stop()
525 self._active_segment = None
526 self._start_new_segment()
528 # called by ShareFinder to choose hashtree sizes in CommonShares, and by
529 # SegmentFetcher to tell if it is still fetching a valid segnum.
530 def get_num_segments(self):
531 # returns (best_num_segments, authoritative)
532 if self.num_segments is None:
533 return (self.guessed_num_segments, False)
534 return (self.num_segments, True)