4 from twisted.python.failure import Failure
5 from twisted.internet import defer
6 from foolscap.api import eventually
7 from allmydata import uri
8 from allmydata.codec import CRSDecoder
9 from allmydata.util import base32, log, hashutil, mathutil, observer
10 from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
11 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
15 from finder import ShareFinder
16 from fetcher import SegmentFetcher
17 from segmentation import Segmentation
18 from common import BadCiphertextHashError
21 def __init__(self, f):
30 """Internal class which manages downloads and holds state. External
31 callers use CiphertextFileNode instead."""
33 # Share._node points to me
34 def __init__(self, verifycap, storage_broker, secret_holder,
35 terminator, history, download_status):
36 assert isinstance(verifycap, uri.CHKFileVerifierURI)
37 self._verifycap = verifycap
38 self._storage_broker = storage_broker
39 self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
42 terminator.register(self) # calls self.stop() at stopService()
44 # 1: Only send network requests if you're active (self.running is True)
45 # 2: Use TimerService, not reactor.callLater
46 # 3: You can do eventual-sends any time.
47 # These rules should mean that once
48 # stopService()+flushEventualQueue() fires, everything will be done.
49 self._secret_holder = secret_holder
50 self._history = history
51 self._download_status = download_status
53 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
54 self.share_hash_tree = IncompleteHashTree(N)
56 # we guess the segment size, so Segmentation can pull non-initial
57 # segments in a single roundtrip. This populates
58 # .guessed_segment_size, .guessed_num_segments, and
59 # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
61 self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
63 # filled in when we parse a valid UEB
65 self.segment_size = None
66 self.tail_segment_size = None
67 self.tail_segment_padded = None
68 self.num_segments = None
69 self.block_size = None
70 self.tail_block_size = None
72 # things to track callers that want data
74 # _segment_requests can have duplicates
75 self._segment_requests = [] # (segnum, d, cancel_handle, logparent)
76 self._active_segment = None # a SegmentFetcher, with .segnum
78 self._segsize_observers = observer.OneShotObserverList()
80 # we create one top-level logparent for this _Node, and another one
81 # for each read() call. Segmentation and get_segment() messages are
82 # associated with the read() call, everything else is tied to the
84 lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
86 " guessed_segsize=%(guessed_segsize)d,"
87 " guessed_numsegs=%(guessed_numsegs)d",
88 si=self._si_prefix, size=verifycap.size,
89 guessed_segsize=self.guessed_segment_size,
90 guessed_numsegs=self.guessed_num_segments,
91 level=log.OPERATIONAL, umid="uJ0zAQ")
94 self._sharefinder = ShareFinder(storage_broker, verifycap, self,
95 self._download_status, lp)
98 def _build_guessed_tables(self, max_segment_size):
99 size = min(self._verifycap.size, max_segment_size)
100 s = mathutil.next_multiple(size, self._verifycap.needed_shares)
101 self.guessed_segment_size = s
102 r = self._calculate_sizes(self.guessed_segment_size)
103 self.guessed_num_segments = r["num_segments"]
104 # as with CommonShare, our ciphertext_hash_tree is a stub until we
105 # get the real num_segments
106 self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
107 self.ciphertext_hash_tree_leaves = self.guessed_num_segments
110 return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
113 # called by the Terminator at shutdown, mostly for tests
114 if self._active_segment:
115 self._active_segment.stop()
116 self._active_segment = None
117 self._sharefinder.stop()
119 # things called by outside callers, via CiphertextFileNode. get_segment()
120 # may also be called by Segmentation.
122 def read(self, consumer, offset=0, size=None, read_ev=None):
123 """I am the main entry point, from which FileNode.read() can get
124 data. I feed the consumer with the desired range of ciphertext. I
125 return a Deferred that fires (with the consumer) when the read is
128 Note that there is no notion of a 'file pointer': each call to read()
129 uses an independent offset= value."""
130 # for concurrent operations: each gets its own Segmentation manager
132 size = self._verifycap.size
133 # ignore overruns: clip size so offset+size does not go past EOF, and
134 # so size is not negative (which indicates that offset >= EOF)
135 size = max(0, min(size, self._verifycap.size-offset))
137 read_ev = self._download_status.add_read_event(offset, size, now())
139 lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
140 si=base32.b2a(self._verifycap.storage_index)[:8],
141 offset=offset, size=size,
142 level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
144 sp = self._history.stats_provider
145 sp.count("downloader.files_downloaded", 1) # really read() calls
146 sp.count("downloader.bytes_downloaded", size)
148 read_ev.finished(now())
149 # no data, so no producer, so no register/unregisterProducer
150 return defer.succeed(consumer)
151 s = Segmentation(self, offset, size, consumer, read_ev, lp)
152 # this raises an interesting question: what segments to fetch? if
153 # offset=0, always fetch the first segment, and then allow
154 # Segmentation to be responsible for pulling the subsequent ones if
155 # the first wasn't large enough. If offset>0, we're going to need an
156 # extra roundtrip to get the UEB (and therefore the segment size)
157 # before we can figure out which segment to get. TODO: allow the
158 # offset-table-guessing code (which starts by guessing the segsize)
159 # to assist the offset>0 process.
162 read_ev.finished(now())
167 def get_segment(self, segnum, logparent=None):
168 """Begin downloading a segment. I return a tuple (d, c): 'd' is a
169 Deferred that fires with (offset,data) when the desired segment is
170 available, and c is an object on which c.cancel() can be called to
171 disavow interest in the segment (after which 'd' will never fire).
173 You probably need to know the segment size before calling this,
174 unless you want the first few bytes of the file. If you ask for a
175 segment number which turns out to be too large, the Deferred will
176 errback with BadSegmentNumberError.
178 The Deferred fires with the offset of the first byte of the data
179 segment, so that you can call get_segment() before knowing the
180 segment size, and still know which data you received.
182 The Deferred can also errback with other fatal problems, such as
183 NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
185 lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
186 si=base32.b2a(self._verifycap.storage_index)[:8],
188 level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
189 self._download_status.add_segment_request(segnum, now())
191 c = Cancel(self._cancel_request)
192 self._segment_requests.append( (segnum, d, c, lp) )
193 self._start_new_segment()
196 def get_segsize(self):
197 """Return a Deferred that fires when we know the real segment size."""
198 if self.segment_size:
199 return defer.succeed(self.segment_size)
200 # TODO: this downloads (and discards) the first segment of the file.
201 # We could make this more efficient by writing
202 # fetcher.SegmentSizeFetcher, with the job of finding a single valid
203 # share and extracting the UEB. We'd add Share.get_UEB() to request
205 (d,c) = self.get_segment(0)
206 # this ensures that an error during get_segment() will errback the
207 # caller, so Repair won't wait forever on completely missing files
208 d.addCallback(lambda ign: self._segsize_observers.when_fired())
211 # things called by the Segmentation object used to transform
212 # arbitrary-sized read() calls into quantized segment fetches
214 def _start_new_segment(self):
215 if self._active_segment is None and self._segment_requests:
216 segnum = self._segment_requests[0][0]
217 k = self._verifycap.needed_shares
218 lp = self._segment_requests[0][3]
219 log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
220 node=repr(self), segnum=segnum,
221 level=log.NOISY, parent=lp, umid="wAlnHQ")
222 self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
223 active_shares = [s for s in self._shares if s.is_alive()]
224 fetcher.add_shares(active_shares) # this triggers the loop
227 # called by our child ShareFinder
228 def got_shares(self, shares):
229 self._shares.update(shares)
230 if self._active_segment:
231 self._active_segment.add_shares(shares)
232 def no_more_shares(self):
233 self._no_more_shares = True
234 if self._active_segment:
235 self._active_segment.no_more_shares()
237 # things called by our Share instances
239 def validate_and_store_UEB(self, UEB_s):
240 log.msg("validate_and_store_UEB",
241 level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
242 h = hashutil.uri_extension_hash(UEB_s)
243 if h != self._verifycap.uri_extension_hash:
245 self._parse_and_store_UEB(UEB_s) # sets self._stuff
246 # TODO: a malformed (but authentic) UEB could throw an assertion in
247 # _parse_and_store_UEB, and we should abandon the download.
250 # inform the ShareFinder about our correct number of segments. This
251 # will update the block-hash-trees in all existing CommonShare
252 # instances, and will populate new ones with the correct value.
253 self._sharefinder.update_num_segments()
255 def _parse_and_store_UEB(self, UEB_s):
256 # Note: the UEB contains needed_shares and total_shares. These are
257 # redundant and inferior (the filecap contains the authoritative
258 # values). However, because it is possible to encode the same file in
259 # multiple ways, and the encoders might choose (poorly) to use the
260 # same key for both (therefore getting the same SI), we might
261 # encounter shares for both types. The UEB hashes will be different,
262 # however, and we'll disregard the "other" encoding's shares as
265 # therefore, we ignore d['total_shares'] and d['needed_shares'].
267 d = uri.unpack_extension(UEB_s)
269 log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
270 ueb=repr(uri.unpack_extension_readable(UEB_s)),
271 vcap=self._verifycap.to_string(),
272 level=log.NOISY, parent=self._lp, umid="cVqZnA")
274 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
276 self.segment_size = d['segment_size']
277 self._segsize_observers.fire(self.segment_size)
279 r = self._calculate_sizes(self.segment_size)
280 self.tail_segment_size = r["tail_segment_size"]
281 self.tail_segment_padded = r["tail_segment_padded"]
282 self.num_segments = r["num_segments"]
283 self.block_size = r["block_size"]
284 self.tail_block_size = r["tail_block_size"]
285 log.msg("actual sizes: %s" % (r,),
286 level=log.NOISY, parent=self._lp, umid="PY6P5Q")
287 if (self.segment_size == self.guessed_segment_size
288 and self.num_segments == self.guessed_num_segments):
289 log.msg("my guess was right!",
290 level=log.NOISY, parent=self._lp, umid="x340Ow")
292 log.msg("my guess was wrong! Extra round trips for me.",
293 level=log.NOISY, parent=self._lp, umid="tb7RJw")
295 # zfec.Decode() instantiation is fast, but still, let's use the same
296 # codec instance for all but the last segment. 3-of-10 takes 15us on
297 # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
298 # 2.5ms, worst-case 254-of-255 is 9.3ms
299 self._codec = CRSDecoder()
300 self._codec.set_params(self.segment_size, k, N)
303 # Ciphertext hash tree root is mandatory, so that there is at most
304 # one ciphertext that matches this read-cap or verify-cap. The
305 # integrity check on the shares is not sufficient to prevent the
306 # original encoder from creating some shares of file A and other
307 # shares of file B. self.ciphertext_hash_tree was a guess before:
308 # this is where we create it for real.
309 self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
310 self.ciphertext_hash_tree_leaves = self.num_segments
311 self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
313 self.share_hash_tree.set_hashes({0: d['share_root_hash']})
315 # Our job is a fast download, not verification, so we ignore any
316 # redundant fields. The Verifier uses a different code path which
317 # does not ignore them.
319 def _calculate_sizes(self, segment_size):
320 # segments of ciphertext
321 size = self._verifycap.size
322 k = self._verifycap.needed_shares
324 # this assert matches the one in encode.py:127 inside
325 # Encoded._got_all_encoding_parameters, where the UEB is constructed
326 assert segment_size % k == 0
328 # the last segment is usually short. We don't store a whole segsize,
329 # but we do pad the segment up to a multiple of k, because the
330 # encoder requires that.
331 tail_segment_size = size % segment_size
332 if tail_segment_size == 0:
333 tail_segment_size = segment_size
334 padded = mathutil.next_multiple(tail_segment_size, k)
335 tail_segment_padded = padded
337 num_segments = mathutil.div_ceil(size, segment_size)
339 # each segment is turned into N blocks. All but the last are of size
340 # block_size, and the last is of size tail_block_size
341 block_size = segment_size / k
342 tail_block_size = tail_segment_padded / k
344 return { "tail_segment_size": tail_segment_size,
345 "tail_segment_padded": tail_segment_padded,
346 "num_segments": num_segments,
347 "block_size": block_size,
348 "tail_block_size": tail_block_size,
352 def process_share_hashes(self, share_hashes):
353 for hashnum in share_hashes:
354 if hashnum >= len(self.share_hash_tree):
355 # "BadHashError" is normally for e.g. a corrupt block. We
356 # sort of abuse it here to mean a badly numbered hash (which
357 # indicates corruption in the number bytes, rather than in
359 raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
360 % (hashnum, len(self.share_hash_tree)))
361 self.share_hash_tree.set_hashes(share_hashes)
363 def get_desired_ciphertext_hashes(self, segnum):
364 if segnum < self.ciphertext_hash_tree_leaves:
365 return self.ciphertext_hash_tree.needed_hashes(segnum,
368 def get_needed_ciphertext_hashes(self, segnum):
369 cht = self.ciphertext_hash_tree
370 return cht.needed_hashes(segnum, include_leaf=True)
372 def process_ciphertext_hashes(self, hashes):
373 assert self.num_segments is not None
374 # this may raise BadHashError or NotEnoughHashesError
375 self.ciphertext_hash_tree.set_hashes(hashes)
378 # called by our child SegmentFetcher
380 def want_more_shares(self):
381 self._sharefinder.hungry()
383 def fetch_failed(self, sf, f):
384 assert sf is self._active_segment
385 # deliver error upwards
386 for (d,c) in self._extract_requests(sf.segnum):
387 eventually(self._deliver, d, c, f)
388 self._active_segment = None
389 self._start_new_segment()
391 def process_blocks(self, segnum, blocks):
392 d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
393 d.addCallback(self._check_ciphertext_hash, segnum)
394 def _deliver(result):
395 ds = self._download_status
396 if isinstance(result, Failure):
397 ds.add_segment_error(segnum, now())
399 (offset, segment, decodetime) = result
400 ds.add_segment_delivery(segnum, now(),
401 offset, len(segment), decodetime)
402 log.msg(format="delivering segment(%(segnum)d)",
404 level=log.OPERATIONAL, parent=self._lp,
406 for (d,c) in self._extract_requests(segnum):
407 eventually(self._deliver, d, c, result)
408 self._active_segment = None
409 self._start_new_segment()
411 d.addErrback(lambda f:
412 log.err("unhandled error during process_blocks",
413 failure=f, level=log.WEIRD,
414 parent=self._lp, umid="MkEsCg"))
416 def _decode_blocks(self, segnum, blocks):
417 tail = (segnum == self.num_segments-1)
419 block_size = self.block_size
420 decoded_size = self.segment_size
422 # account for the padding in the last segment
424 k, N = self._verifycap.needed_shares, self._verifycap.total_shares
425 codec.set_params(self.tail_segment_padded, k, N)
426 block_size = self.tail_block_size
427 decoded_size = self.tail_segment_padded
431 for (shareid, share) in blocks.iteritems():
432 assert len(share) == block_size
433 shareids.append(shareid)
438 d = codec.decode(shares, shareids) # segment
440 def _process(buffers):
441 decodetime = now() - start
442 segment = "".join(buffers)
443 assert len(segment) == decoded_size
446 segment = segment[:self.tail_segment_size]
447 return (segment, decodetime)
448 d.addCallback(_process)
451 def _check_ciphertext_hash(self, (segment, decodetime), segnum):
452 assert self._active_segment.segnum == segnum
453 assert self.segment_size is not None
454 offset = segnum * self.segment_size
456 h = hashutil.crypttext_segment_hash(segment)
458 self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
459 return (offset, segment, decodetime)
460 except (BadHashError, NotEnoughHashesError):
461 format = ("hash failure in ciphertext_hash_tree:"
462 " segnum=%(segnum)d, SI=%(si)s")
463 log.msg(format=format, segnum=segnum, si=self._si_prefix,
465 level=log.WEIRD, parent=self._lp, umid="MTwNnw")
466 # this is especially weird, because we made it past the share
467 # hash tree. It implies that we're using the wrong encoding, or
468 # that the uploader deliberately constructed a bad UEB.
469 msg = format % {"segnum": segnum, "si": self._si_prefix}
470 raise BadCiphertextHashError(msg)
472 def _deliver(self, d, c, result):
473 # this method exists to handle cancel() that occurs between
474 # _got_segment and _deliver
476 c.active = False # it is now too late to cancel
477 d.callback(result) # might actually be an errback
479 def _extract_requests(self, segnum):
480 """Remove matching requests and return their (d,c) tuples so that the
481 caller can retire them."""
482 retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests
483 if segnum0 == segnum]
484 self._segment_requests = [t for t in self._segment_requests
488 def _cancel_request(self, c):
489 self._segment_requests = [t for t in self._segment_requests
491 segnums = [segnum for (segnum,d,c,lp) in self._segment_requests]
492 # self._active_segment might be None in rare circumstances, so make
493 # sure we tolerate it
494 if self._active_segment and self._active_segment.segnum not in segnums:
495 self._active_segment.stop()
496 self._active_segment = None
497 self._start_new_segment()
499 # called by ShareFinder to choose hashtree sizes in CommonShares, and by
500 # SegmentFetcher to tell if it is still fetching a valid segnum.
501 def get_num_segments(self):
502 # returns (best_num_segments, authoritative)
503 if self.num_segments is None:
504 return (self.guessed_num_segments, False)
505 return (self.num_segments, True)