4 from zope.interface import Interface
5 from twisted.python.failure import Failure
6 from twisted.internet import defer
7 from foolscap.api import eventually
8 from allmydata import uri
9 from allmydata.codec import CRSDecoder
10 from allmydata.util import base32, log, hashutil, mathutil, observer
11 from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
12 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
16 from finder import ShareFinder
17 from fetcher import SegmentFetcher
18 from segmentation import Segmentation
19 from common import BadCiphertextHashError
21 class IDownloadStatusHandlingConsumer(Interface):
22 def set_download_status_read_event(read_ev):
23 """Record the DownloadStatus 'read event', to be updated with the
24 time it takes to decrypt each chunk of data."""
27 def __init__(self, f):
36 """Internal class which manages downloads and holds state. External
37 callers use CiphertextFileNode instead."""
39 # Share._node points to me
40 def __init__(self, verifycap, storage_broker, secret_holder,
41 terminator, history, download_status):
42 assert isinstance(verifycap, uri.CHKFileVerifierURI)
43 self._verifycap = verifycap
44 self._storage_broker = storage_broker
45 self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
48 terminator.register(self) # calls self.stop() at stopService()
50 # 1: Only send network requests if you're active (self.running is True)
51 # 2: Use TimerService, not reactor.callLater
52 # 3: You can do eventual-sends any time.
53 # These rules should mean that once
54 # stopService()+flushEventualQueue() fires, everything will be done.
55 self._secret_holder = secret_holder
56 self._history = history
57 self._download_status = download_status
59 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
60 self.share_hash_tree = IncompleteHashTree(N)
62 # we guess the segment size, so Segmentation can pull non-initial
63 # segments in a single roundtrip. This populates
64 # .guessed_segment_size, .guessed_num_segments, and
65 # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
67 self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
69 # filled in when we parse a valid UEB
71 self.segment_size = None
72 self.tail_segment_size = None
73 self.tail_segment_padded = None
74 self.num_segments = None
75 self.block_size = None
76 self.tail_block_size = None
78 # things to track callers that want data
80 # _segment_requests can have duplicates
81 self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp)
82 self._active_segment = None # a SegmentFetcher, with .segnum
84 self._segsize_observers = observer.OneShotObserverList()
86 # we create one top-level logparent for this _Node, and another one
87 # for each read() call. Segmentation and get_segment() messages are
88 # associated with the read() call, everything else is tied to the
90 lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
92 " guessed_segsize=%(guessed_segsize)d,"
93 " guessed_numsegs=%(guessed_numsegs)d",
94 si=self._si_prefix, size=verifycap.size,
95 guessed_segsize=self.guessed_segment_size,
96 guessed_numsegs=self.guessed_num_segments,
97 level=log.OPERATIONAL, umid="uJ0zAQ")
100 self._sharefinder = ShareFinder(storage_broker, verifycap, self,
101 self._download_status, lp)
104 def _build_guessed_tables(self, max_segment_size):
105 size = min(self._verifycap.size, max_segment_size)
106 s = mathutil.next_multiple(size, self._verifycap.needed_shares)
107 self.guessed_segment_size = s
108 r = self._calculate_sizes(self.guessed_segment_size)
109 self.guessed_num_segments = r["num_segments"]
110 # as with CommonShare, our ciphertext_hash_tree is a stub until we
111 # get the real num_segments
112 self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
113 self.ciphertext_hash_tree_leaves = self.guessed_num_segments
116 return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
119 # called by the Terminator at shutdown, mostly for tests
120 if self._active_segment:
121 self._active_segment.stop()
122 self._active_segment = None
123 self._sharefinder.stop()
125 # things called by outside callers, via CiphertextFileNode. get_segment()
126 # may also be called by Segmentation.
128 def read(self, consumer, offset, size):
129 """I am the main entry point, from which FileNode.read() can get
130 data. I feed the consumer with the desired range of ciphertext. I
131 return a Deferred that fires (with the consumer) when the read is
134 Note that there is no notion of a 'file pointer': each call to read()
135 uses an independent offset= value.
137 # for concurrent operations: each gets its own Segmentation manager
139 size = self._verifycap.size
140 # ignore overruns: clip size so offset+size does not go past EOF, and
141 # so size is not negative (which indicates that offset >= EOF)
142 size = max(0, min(size, self._verifycap.size-offset))
144 read_ev = self._download_status.add_read_event(offset, size, now())
145 if IDownloadStatusHandlingConsumer.providedBy(consumer):
146 consumer.set_download_status_read_event(read_ev)
148 lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
149 si=base32.b2a(self._verifycap.storage_index)[:8],
150 offset=offset, size=size,
151 level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
153 sp = self._history.stats_provider
154 sp.count("downloader.files_downloaded", 1) # really read() calls
155 sp.count("downloader.bytes_downloaded", size)
157 read_ev.finished(now())
158 # no data, so no producer, so no register/unregisterProducer
159 return defer.succeed(consumer)
161 # for concurrent operations, each read() gets its own Segmentation
163 s = Segmentation(self, offset, size, consumer, read_ev, lp)
165 # this raises an interesting question: what segments to fetch? if
166 # offset=0, always fetch the first segment, and then allow
167 # Segmentation to be responsible for pulling the subsequent ones if
168 # the first wasn't large enough. If offset>0, we're going to need an
169 # extra roundtrip to get the UEB (and therefore the segment size)
170 # before we can figure out which segment to get. TODO: allow the
171 # offset-table-guessing code (which starts by guessing the segsize)
172 # to assist the offset>0 process.
175 read_ev.finished(now())
180 def get_segment(self, segnum, logparent=None):
181 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
182 Deferred that fires with (offset,data) when the desired segment is
183 available, and c is an object on which c.cancel() can be called to
184 disavow interest in the segment (after which 'd' will never fire).
186 You probably need to know the segment size before calling this,
187 unless you want the first few bytes of the file. If you ask for a
188 segment number which turns out to be too large, the Deferred will
189 errback with BadSegmentNumberError.
191 The Deferred fires with the offset of the first byte of the data
192 segment, so that you can call get_segment() before knowing the
193 segment size, and still know which data you received.
195 The Deferred can also errback with other fatal problems, such as
196 NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
198 lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
199 si=base32.b2a(self._verifycap.storage_index)[:8],
201 level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
202 seg_ev = self._download_status.add_segment_request(segnum, now())
204 c = Cancel(self._cancel_request)
205 self._segment_requests.append( (segnum, d, c, seg_ev, lp) )
206 self._start_new_segment()
209 def get_segsize(self):
210 """Return a Deferred that fires when we know the real segment size."""
211 if self.segment_size:
212 return defer.succeed(self.segment_size)
213 # TODO: this downloads (and discards) the first segment of the file.
214 # We could make this more efficient by writing
215 # fetcher.SegmentSizeFetcher, with the job of finding a single valid
216 # share and extracting the UEB. We'd add Share.get_UEB() to request
218 (d,c) = self.get_segment(0)
219 # this ensures that an error during get_segment() will errback the
220 # caller, so Repair won't wait forever on completely missing files
221 d.addCallback(lambda ign: self._segsize_observers.when_fired())
224 # things called by the Segmentation object used to transform
225 # arbitrary-sized read() calls into quantized segment fetches
227 def _start_new_segment(self):
228 if self._active_segment is None and self._segment_requests:
229 (segnum, d, c, seg_ev, lp) = self._segment_requests[0]
230 k = self._verifycap.needed_shares
231 log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
232 node=repr(self), segnum=segnum,
233 level=log.NOISY, parent=lp, umid="wAlnHQ")
234 self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
235 seg_ev.activate(now())
236 active_shares = [s for s in self._shares if s.is_alive()]
237 fetcher.add_shares(active_shares) # this triggers the loop
240 # called by our child ShareFinder
241 def got_shares(self, shares):
242 self._shares.update(shares)
243 if self._active_segment:
244 self._active_segment.add_shares(shares)
245 def no_more_shares(self):
246 self._no_more_shares = True
247 if self._active_segment:
248 self._active_segment.no_more_shares()
250 # things called by our Share instances
252 def validate_and_store_UEB(self, UEB_s):
253 log.msg("validate_and_store_UEB",
254 level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
255 h = hashutil.uri_extension_hash(UEB_s)
256 if h != self._verifycap.uri_extension_hash:
258 self._parse_and_store_UEB(UEB_s) # sets self._stuff
259 # TODO: a malformed (but authentic) UEB could throw an assertion in
260 # _parse_and_store_UEB, and we should abandon the download.
263 # inform the ShareFinder about our correct number of segments. This
264 # will update the block-hash-trees in all existing CommonShare
265 # instances, and will populate new ones with the correct value.
266 self._sharefinder.update_num_segments()
268 def _parse_and_store_UEB(self, UEB_s):
269 # Note: the UEB contains needed_shares and total_shares. These are
270 # redundant and inferior (the filecap contains the authoritative
271 # values). However, because it is possible to encode the same file in
272 # multiple ways, and the encoders might choose (poorly) to use the
273 # same key for both (therefore getting the same SI), we might
274 # encounter shares for both types. The UEB hashes will be different,
275 # however, and we'll disregard the "other" encoding's shares as
278 # therefore, we ignore d['total_shares'] and d['needed_shares'].
280 d = uri.unpack_extension(UEB_s)
282 log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
283 ueb=repr(uri.unpack_extension_readable(UEB_s)),
284 vcap=self._verifycap.to_string(),
285 level=log.NOISY, parent=self._lp, umid="cVqZnA")
287 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
289 self.segment_size = d['segment_size']
290 self._segsize_observers.fire(self.segment_size)
292 r = self._calculate_sizes(self.segment_size)
293 self.tail_segment_size = r["tail_segment_size"]
294 self.tail_segment_padded = r["tail_segment_padded"]
295 self.num_segments = r["num_segments"]
296 self.block_size = r["block_size"]
297 self.tail_block_size = r["tail_block_size"]
298 log.msg("actual sizes: %s" % (r,),
299 level=log.NOISY, parent=self._lp, umid="PY6P5Q")
300 if (self.segment_size == self.guessed_segment_size
301 and self.num_segments == self.guessed_num_segments):
302 log.msg("my guess was right!",
303 level=log.NOISY, parent=self._lp, umid="x340Ow")
305 log.msg("my guess was wrong! Extra round trips for me.",
306 level=log.NOISY, parent=self._lp, umid="tb7RJw")
308 # zfec.Decode() instantiation is fast, but still, let's use the same
309 # codec instance for all but the last segment. 3-of-10 takes 15us on
310 # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
311 # 2.5ms, worst-case 254-of-255 is 9.3ms
312 self._codec = CRSDecoder()
313 self._codec.set_params(self.segment_size, k, N)
316 # Ciphertext hash tree root is mandatory, so that there is at most
317 # one ciphertext that matches this read-cap or verify-cap. The
318 # integrity check on the shares is not sufficient to prevent the
319 # original encoder from creating some shares of file A and other
320 # shares of file B. self.ciphertext_hash_tree was a guess before:
321 # this is where we create it for real.
322 self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
323 self.ciphertext_hash_tree_leaves = self.num_segments
324 self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
326 self.share_hash_tree.set_hashes({0: d['share_root_hash']})
328 # Our job is a fast download, not verification, so we ignore any
329 # redundant fields. The Verifier uses a different code path which
330 # does not ignore them.
332 def _calculate_sizes(self, segment_size):
333 # segments of ciphertext
334 size = self._verifycap.size
335 k = self._verifycap.needed_shares
337 # this assert matches the one in encode.py:127 inside
338 # Encoded._got_all_encoding_parameters, where the UEB is constructed
339 assert segment_size % k == 0
341 # the last segment is usually short. We don't store a whole segsize,
342 # but we do pad the segment up to a multiple of k, because the
343 # encoder requires that.
344 tail_segment_size = size % segment_size
345 if tail_segment_size == 0:
346 tail_segment_size = segment_size
347 padded = mathutil.next_multiple(tail_segment_size, k)
348 tail_segment_padded = padded
350 num_segments = mathutil.div_ceil(size, segment_size)
352 # each segment is turned into N blocks. All but the last are of size
353 # block_size, and the last is of size tail_block_size
354 block_size = segment_size / k
355 tail_block_size = tail_segment_padded / k
357 return { "tail_segment_size": tail_segment_size,
358 "tail_segment_padded": tail_segment_padded,
359 "num_segments": num_segments,
360 "block_size": block_size,
361 "tail_block_size": tail_block_size,
365 def process_share_hashes(self, share_hashes):
366 for hashnum in share_hashes:
367 if hashnum >= len(self.share_hash_tree):
368 # "BadHashError" is normally for e.g. a corrupt block. We
369 # sort of abuse it here to mean a badly numbered hash (which
370 # indicates corruption in the number bytes, rather than in
372 raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
373 % (hashnum, len(self.share_hash_tree)))
374 self.share_hash_tree.set_hashes(share_hashes)
376 def get_desired_ciphertext_hashes(self, segnum):
377 if segnum < self.ciphertext_hash_tree_leaves:
378 return self.ciphertext_hash_tree.needed_hashes(segnum,
381 def get_needed_ciphertext_hashes(self, segnum):
382 cht = self.ciphertext_hash_tree
383 return cht.needed_hashes(segnum, include_leaf=True)
385 def process_ciphertext_hashes(self, hashes):
386 assert self.num_segments is not None
387 # this may raise BadHashError or NotEnoughHashesError
388 self.ciphertext_hash_tree.set_hashes(hashes)
391 # called by our child SegmentFetcher
393 def want_more_shares(self):
394 self._sharefinder.hungry()
396 def fetch_failed(self, sf, f):
397 assert sf is self._active_segment
398 # deliver error upwards
399 for (d,c,seg_ev) in self._extract_requests(sf.segnum):
401 eventually(self._deliver, d, c, f)
402 self._active_segment = None
403 self._start_new_segment()
405 def process_blocks(self, segnum, blocks):
406 d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
407 d.addCallback(self._check_ciphertext_hash, segnum)
408 def _deliver(result):
409 log.msg(format="delivering segment(%(segnum)d)",
411 level=log.OPERATIONAL, parent=self._lp,
414 if isinstance(result, Failure):
415 # this catches failures in decode or ciphertext hash
416 for (d,c,seg_ev) in self._extract_requests(segnum):
418 eventually(self._deliver, d, c, result)
420 (offset, segment, decodetime) = result
421 for (d,c,seg_ev) in self._extract_requests(segnum):
422 # when we have two requests for the same segment, the
423 # second one will not be "activated" before the data is
424 # delivered, so to allow the status-reporting code to see
425 # consistent behavior, we activate them all now. The
426 # SegmentEvent will ignore duplicate activate() calls.
427 # Note that this will result in an inaccurate "receive
428 # speed" for the second request.
429 seg_ev.activate(when)
430 seg_ev.deliver(when, offset, len(segment), decodetime)
431 eventually(self._deliver, d, c, result)
432 self._active_segment = None
433 self._start_new_segment()
435 d.addErrback(log.err, "unhandled error during process_blocks",
436 level=log.WEIRD, parent=self._lp, umid="MkEsCg")
438 def _decode_blocks(self, segnum, blocks):
439 tail = (segnum == self.num_segments-1)
441 block_size = self.block_size
442 decoded_size = self.segment_size
444 # account for the padding in the last segment
446 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
447 codec.set_params(self.tail_segment_padded, k, N)
448 block_size = self.tail_block_size
449 decoded_size = self.tail_segment_padded
453 for (shareid, share) in blocks.iteritems():
454 assert len(share) == block_size
455 shareids.append(shareid)
460 d = codec.decode(shares, shareids) # segment
462 def _process(buffers):
463 decodetime = now() - start
464 segment = "".join(buffers)
465 assert len(segment) == decoded_size
468 segment = segment[:self.tail_segment_size]
469 return (segment, decodetime)
470 d.addCallback(_process)
473 def _check_ciphertext_hash(self, (segment, decodetime), segnum):
474 assert self._active_segment.segnum == segnum
475 assert self.segment_size is not None
476 offset = segnum * self.segment_size
478 h = hashutil.crypttext_segment_hash(segment)
480 self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
481 return (offset, segment, decodetime)
482 except (BadHashError, NotEnoughHashesError):
483 format = ("hash failure in ciphertext_hash_tree:"
484 " segnum=%(segnum)d, SI=%(si)s")
485 log.msg(format=format, segnum=segnum, si=self._si_prefix,
487 level=log.WEIRD, parent=self._lp, umid="MTwNnw")
488 # this is especially weird, because we made it past the share
489 # hash tree. It implies that we're using the wrong encoding, or
490 # that the uploader deliberately constructed a bad UEB.
491 msg = format % {"segnum": segnum, "si": self._si_prefix}
492 raise BadCiphertextHashError(msg)
494 def _deliver(self, d, c, result):
495 # this method exists to handle cancel() that occurs between
496 # _got_segment and _deliver
498 c.active = False # it is now too late to cancel
499 d.callback(result) # might actually be an errback
501 def _extract_requests(self, segnum):
502 """Remove matching requests and return their (d,c) tuples so that the
503 caller can retire them."""
504 retire = [(d,c,seg_ev)
505 for (segnum0,d,c,seg_ev,lp) in self._segment_requests
506 if segnum0 == segnum]
507 self._segment_requests = [t for t in self._segment_requests
511 def _cancel_request(self, c):
512 self._segment_requests = [t for t in self._segment_requests
514 segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
515 # self._active_segment might be None in rare circumstances, so make
516 # sure we tolerate it
517 if self._active_segment and self._active_segment.segnum not in segnums:
518 self._active_segment.stop()
519 self._active_segment = None
520 self._start_new_segment()
522 # called by ShareFinder to choose hashtree sizes in CommonShares, and by
523 # SegmentFetcher to tell if it is still fetching a valid segnum.
524 def get_num_segments(self):
525 # returns (best_num_segments, authoritative)
526 if self.num_segments is None:
527 return (self.guessed_num_segments, False)
528 return (self.num_segments, True)