6 from twisted.python.failure import Failure
7 from foolscap.api import eventually
8 from allmydata.util import base32, log, hashutil, mathutil
9 from allmydata.util.spans import Spans, DataSpans
10 from allmydata.interfaces import HASH_SIZE
11 from allmydata.hashtree import IncompleteHashTree, BadHashError, \
14 from allmydata.immutable.layout import make_write_bucket_proxy
15 from allmydata.util.observer import EventStreamObserver
16 from common import COMPLETE, CORRUPT, DEAD, BADSEGNUM
19 class LayoutInvalid(Exception):
21 class DataUnavailable(Exception):
25 """I represent a single instance of a single share (e.g. I reference the
26 shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
27 I am associated with a CommonShare that remembers data that is held in
28 common among e.g. SI=abcde/shnum2 across all servers. I am also
29 associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all
32 # this is a specific implementation of IShare for tahoe's native storage
33 # servers. A different backend would use a different class.
35 def __init__(self, rref, server_version, verifycap, commonshare, node,
36 download_status, peerid, shnum, dyhb_rtt, logparent):
38 self._server_version = server_version
39 self._node = node # holds share_hash_tree and UEB
40 self.actual_segment_size = node.segment_size # might still be None
41 # XXX change node.guessed_segment_size to
42 # node.best_guess_segment_size(), which should give us the real ones
43 # if known, else its guess.
44 self._guess_offsets(verifycap, node.guessed_segment_size)
45 self.actual_offsets = None
46 self._UEB_length = None
47 self._commonshare = commonshare # holds block_hash_tree
48 self._download_status = download_status
50 self._peerid_s = base32.b2a(peerid)[:5]
51 self._storage_index = verifycap.storage_index
52 self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
54 self._dyhb_rtt = dyhb_rtt
55 # self._alive becomes False upon fatal corruption or server error
57 self._lp = log.msg(format="%(share)s created", share=repr(self),
58 level=log.NOISY, parent=logparent, umid="P7hv2w")
60 self._pending = Spans() # request sent but no response received yet
61 self._received = DataSpans() # ACK response received, with data
62 self._unavailable = Spans() # NAK response received, no data
64 # any given byte of the share can be in one of four states:
65 # in: _wanted, _requested, _received
66 # FALSE FALSE FALSE : don't care about it at all
67 # TRUE FALSE FALSE : want it, haven't yet asked for it
68 # TRUE TRUE FALSE : request is in-flight
70 # FALSE TRUE TRUE : got it, haven't used it yet
71 # FALSE TRUE FALSE : got it and used it
72 # FALSE FALSE FALSE : block consumed, ready to ask again
74 # when we request data and get a NAK, we leave it in _requested
75 # to remind ourself to not ask for it again. We don't explicitly
76 # remove it from anything (maybe this should change).
78 # We retain the hashtrees in the Node, so we leave those spans in
79 # _requested (and never ask for them again, as long as the Node is
80 # alive). But we don't retain data blocks (too big), so when we
81 # consume a data block, we remove it from _requested, so a later
82 # download can re-fetch it.
84 self._requested_blocks = [] # (segnum, set(observer2..))
85 ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"]
86 self._overrun_ok = ver["tolerates-immutable-read-overrun"]
87 # If _overrun_ok and we guess the offsets correctly, we can get
88 # everything in one RTT. If _overrun_ok and we guess wrong, we might
89 # need two RTT (but we could get lucky and do it in one). If overrun
90 # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version,
91 # 2=offset table, 3=UEB_length and everything else (hashes, block),
94 self.had_corruption = False # for unit tests
97 return "Share(sh%d-on-%s)" % (self._shnum, self._peerid_s)
100 # XXX: reconsider. If the share sees a single error, should it remain
101 # dead for all time? Or should the next segment try again? This DEAD
102 # state is stored elsewhere too (SegmentFetcher per-share states?)
103 # and needs to be consistent. We clear _alive in self._fail(), which
104 # is called upon a network error, or layout failure, or hash failure
105 # in the UEB or a hash tree. We do not _fail() for a hash failure in
106 # a block, but of course we still tell our callers about
107 # state=CORRUPT so they'll find a different share.
110 def _guess_offsets(self, verifycap, guessed_segment_size):
111 self.guessed_segment_size = guessed_segment_size
112 size = verifycap.size
113 k = verifycap.needed_shares
114 N = verifycap.total_shares
115 r = self._node._calculate_sizes(guessed_segment_size)
116 # num_segments, block_size/tail_block_size
117 # guessed_segment_size/tail_segment_size/tail_segment_padded
118 share_size = mathutil.div_ceil(size, k)
119 # share_size is the amount of block data that will be put into each
120 # share, summed over all segments. It does not include hashes, the
121 # UEB, or other overhead.
123 # use the upload-side code to get this as accurate as possible
124 ht = IncompleteHashTree(N)
125 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
126 wbp = make_write_bucket_proxy(None, share_size, r["block_size"],
127 r["num_segments"], num_share_hashes, 0,
129 self._fieldsize = wbp.fieldsize
130 self._fieldstruct = wbp.fieldstruct
131 self.guessed_offsets = wbp._offsets
133 # called by our client, the SegmentFetcher
134 def get_block(self, segnum):
135 """Add a block number to the list of requests. This will eventually
136 result in a fetch of the data necessary to validate the block, then
137 the block itself. The fetch order is generally
138 first-come-first-served, but requests may be answered out-of-order if
139 data becomes available sooner.
141 I return an EventStreamObserver, which has two uses. The first is to
142 call o.subscribe(), which gives me a place to send state changes and
143 eventually the data block. The second is o.cancel(), which removes
144 the request (if it is still active).
146 I will distribute the following events through my EventStreamObserver:
147 - state=OVERDUE: ?? I believe I should have had an answer by now.
148 You may want to ask another share instead.
149 - state=BADSEGNUM: the segnum you asked for is too large. I must
150 fetch a valid UEB before I can determine this,
151 so the notification is asynchronous
152 - state=COMPLETE, block=data: here is a valid block
153 - state=CORRUPT: this share contains corrupted data
154 - state=DEAD, f=Failure: the server reported an error, this share
157 log.msg("%s.get_block(%d)" % (repr(self), segnum),
158 level=log.NOISY, parent=self._lp, umid="RTo9MQ")
160 o = EventStreamObserver()
161 o.set_canceler(self, "_cancel_block_request")
162 for i,(segnum0,observers) in enumerate(self._requested_blocks):
163 if segnum0 == segnum:
167 self._requested_blocks.append( (segnum, set([o])) )
168 eventually(self.loop)
171 def _cancel_block_request(self, o):
173 for e in self._requested_blocks:
174 (segnum0, observers) = e
177 new_requests.append(e)
178 self._requested_blocks = new_requests
181 def _active_segnum_and_observers(self):
182 if self._requested_blocks:
183 # we only retrieve information for one segment at a time, to
184 # minimize alacrity (first come, first served)
185 return self._requested_blocks[0]
192 # if any exceptions occur here, kill the download
193 log.msg("%s.loop, reqs=[%s], pending=%s, received=%s,"
196 ",".join([str(req[0]) for req in self._requested_blocks]),
197 self._pending.dump(), self._received.dump(),
198 self._unavailable.dump() ),
199 level=log.NOISY, parent=self._lp, umid="BaL1zw")
201 # all exception cases call self._fail(), which clears self._alive
202 except (BadHashError, NotEnoughHashesError, LayoutInvalid), e:
203 # Abandon this share. We do this if we see corruption in the
204 # offset table, the UEB, or a hash tree. We don't abandon the
205 # whole share if we see corruption in a data block (we abandon
206 # just the one block, and still try to get data from other blocks
207 # on the same server). In theory, we could get good data from a
208 # share with a corrupt UEB (by first getting the UEB from some
209 # other share), or corrupt hash trees, but the logic to decide
210 # when this is safe is non-trivial. So for now, give up at the
211 # first sign of corruption.
213 # _satisfy_*() code which detects corruption should first call
214 # self._signal_corruption(), and then raise the exception.
215 log.msg(format="corruption detected in %(share)s",
217 level=log.UNUSUAL, parent=self._lp, umid="gWspVw")
218 self._fail(Failure(e), log.UNUSUAL)
219 except DataUnavailable, e:
220 # Abandon this share.
221 log.msg(format="need data that will never be available"
222 " from %s: pending=%s, received=%s, unavailable=%s" %
224 self._pending.dump(), self._received.dump(),
225 self._unavailable.dump() ),
226 level=log.UNUSUAL, parent=self._lp, umid="F7yJnQ")
227 self._fail(Failure(e), log.UNUSUAL)
228 except BaseException:
229 self._fail(Failure())
231 log.msg("%s.loop done, reqs=[%s], pending=%s, received=%s,"
234 ",".join([str(req[0]) for req in self._requested_blocks]),
235 self._pending.dump(), self._received.dump(),
236 self._unavailable.dump() ),
237 level=log.NOISY, parent=self._lp, umid="9lRaRA")
240 # we are (eventually) called after all state transitions:
241 # new segments added to self._requested_blocks
242 # new data received from servers (responses to our read() calls)
243 # impatience timer fires (server appears slow)
245 # First, consume all of the information that we currently have, for
246 # all the segments people currently want.
247 while self._get_satisfaction():
250 # When we get no satisfaction (from the data we've received so far),
251 # we determine what data we desire (to satisfy more requests). The
252 # number of segments is finite, so I can't get no satisfaction
254 wanted, needed = self._desire()
256 # Finally, send out requests for whatever we need (desire minus
257 # have). You can't always get what you want, but if you try
258 # sometimes, you just might find, you get what you need.
259 self._send_requests(wanted + needed)
261 # and sometimes you can't even get what you need
262 disappointment = needed & self._unavailable
263 if disappointment.len():
264 self.had_corruption = True
265 raise DataUnavailable("need %s but will never get it" %
266 disappointment.dump())
268 def _get_satisfaction(self):
269 # return True if we retired a data block, and should therefore be
270 # called again. Return False if we don't retire a data block (even if
271 # we do retire some other data, like hash chains).
273 if self.actual_offsets is None:
274 if not self._satisfy_offsets():
275 # can't even look at anything without the offset table
278 if not self._node.have_UEB:
279 if not self._satisfy_UEB():
280 # can't check any hashes without the UEB
282 # the call to _satisfy_UEB() will immediately set the
283 # authoritative num_segments in all our CommonShares. If we
284 # guessed wrong, we might stil be working on a bogus segnum
285 # (beyond the real range). We catch this and signal BADSEGNUM
286 # before invoking any further code that touches hashtrees.
287 self.actual_segment_size = self._node.segment_size # might be updated
288 assert self.actual_segment_size is not None
290 # knowing the UEB means knowing num_segments
291 assert self._node.num_segments is not None
293 segnum, observers = self._active_segnum_and_observers()
294 # if segnum is None, we don't really need to do anything (we have no
295 # outstanding readers right now), but we'll fill in the bits that
296 # aren't tied to any particular segment.
298 if segnum is not None and segnum >= self._node.num_segments:
300 o.notify(state=BADSEGNUM)
301 self._requested_blocks.pop(0)
304 if self._node.share_hash_tree.needed_hashes(self._shnum):
305 if not self._satisfy_share_hash_tree():
306 # can't check block_hash_tree without a root
309 if self._commonshare.need_block_hash_root():
310 block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
311 self._commonshare.set_block_hash_root(block_hash_root)
314 return False # we don't want any particular segment right now
317 needed_hashes = self._commonshare.get_needed_block_hashes(segnum)
319 if not self._satisfy_block_hash_tree(needed_hashes):
320 # can't check block without block_hash_tree
323 # ciphertext_hash_tree
324 needed_hashes = self._node.get_needed_ciphertext_hashes(segnum)
326 if not self._satisfy_ciphertext_hash_tree(needed_hashes):
327 # can't check decoded blocks without ciphertext_hash_tree
331 return self._satisfy_data_block(segnum, observers)
333 def _satisfy_offsets(self):
334 version_s = self._received.get(0, 4)
335 if version_s is None:
337 (version,) = struct.unpack(">L", version_s)
340 self._fieldsize = 0x4
341 self._fieldstruct = "L"
344 self._fieldsize = 0x8
345 self._fieldstruct = "Q"
347 self.had_corruption = True
348 raise LayoutInvalid("unknown version %d (I understand 1 and 2)"
350 offset_table_size = 6 * self._fieldsize
351 table_s = self._received.pop(table_start, offset_table_size)
354 fields = struct.unpack(">"+6*self._fieldstruct, table_s)
356 for i,field in enumerate(['data',
357 'plaintext_hash_tree', # UNUSED
358 'crypttext_hash_tree',
363 offsets[field] = fields[i]
364 self.actual_offsets = offsets
365 log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields),
366 level=log.NOISY, parent=self._lp, umid="jedQcw")
367 self._received.remove(0, 4) # don't need this anymore
369 # validate the offsets a bit
370 share_hashes_size = offsets["uri_extension"] - offsets["share_hashes"]
371 if share_hashes_size < 0 or share_hashes_size % (2+HASH_SIZE) != 0:
372 # the share hash chain is stored as (hashnum,hash) pairs
373 self.had_corruption = True
374 raise LayoutInvalid("share hashes malformed -- should be a"
375 " multiple of %d bytes -- not %d" %
376 (2+HASH_SIZE, share_hashes_size))
377 block_hashes_size = offsets["share_hashes"] - offsets["block_hashes"]
378 if block_hashes_size < 0 or block_hashes_size % (HASH_SIZE) != 0:
379 # the block hash tree is stored as a list of hashes
380 self.had_corruption = True
381 raise LayoutInvalid("block hashes malformed -- should be a"
382 " multiple of %d bytes -- not %d" %
383 (HASH_SIZE, block_hashes_size))
384 # we only look at 'crypttext_hash_tree' if the UEB says we're
385 # actually using it. Same with 'plaintext_hash_tree'. This gives us
386 # some wiggle room: a place to stash data for later extensions.
390 def _satisfy_UEB(self):
391 o = self.actual_offsets
392 fsize = self._fieldsize
393 UEB_length_s = self._received.get(o["uri_extension"], fsize)
396 (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
397 UEB_s = self._received.pop(o["uri_extension"]+fsize, UEB_length)
400 self._received.remove(o["uri_extension"], fsize)
402 self._node.validate_and_store_UEB(UEB_s)
404 except (LayoutInvalid, BadHashError), e:
405 # TODO: if this UEB was bad, we'll keep trying to validate it
406 # over and over again. Only log.err on the first one, or better
407 # yet skip all but the first
409 self._signal_corruption(f, o["uri_extension"], fsize+UEB_length)
410 self.had_corruption = True
413 def _satisfy_share_hash_tree(self):
414 # the share hash chain is stored as (hashnum,hash) tuples, so you
415 # can't fetch just the pieces you need, because you don't know
416 # exactly where they are. So fetch everything, and parse the results
418 o = self.actual_offsets
419 hashlen = o["uri_extension"] - o["share_hashes"]
420 assert hashlen % (2+HASH_SIZE) == 0
421 hashdata = self._received.get(o["share_hashes"], hashlen)
425 for i in range(0, hashlen, 2+HASH_SIZE):
426 (hashnum,) = struct.unpack(">H", hashdata[i:i+2])
427 hashvalue = hashdata[i+2:i+2+HASH_SIZE]
428 share_hashes[hashnum] = hashvalue
429 # TODO: if they give us an empty set of hashes,
430 # process_share_hashes() won't fail. We must ensure that this
431 # situation doesn't allow unverified shares through. Manual testing
432 # shows that set_block_hash_root() throws an assert because an
433 # internal node is None instead of an actual hash, but we want
434 # something better. It's probably best to add a method to
435 # IncompleteHashTree which takes a leaf number and raises an
436 # exception unless that leaf is present and fully validated.
438 self._node.process_share_hashes(share_hashes)
439 # adds to self._node.share_hash_tree
440 except (BadHashError, NotEnoughHashesError), e:
442 self._signal_corruption(f, o["share_hashes"], hashlen)
443 self.had_corruption = True
445 self._received.remove(o["share_hashes"], hashlen)
448 def _signal_corruption(self, f, start, offset):
449 # there was corruption somewhere in the given range
450 reason = "corruption in share[%d-%d): %s" % (start, start+offset,
452 self._rref.callRemoteOnly("advise_corrupt_share", reason)
454 def _satisfy_block_hash_tree(self, needed_hashes):
455 o_bh = self.actual_offsets["block_hashes"]
457 for hashnum in needed_hashes:
458 hashdata = self._received.get(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
460 block_hashes[hashnum] = hashdata
462 return False # missing some hashes
463 # note that we don't submit any hashes to the block_hash_tree until
464 # we've gotten them all, because the hash tree will throw an
465 # exception if we only give it a partial set (which it therefore
468 self._commonshare.process_block_hashes(block_hashes)
469 except (BadHashError, NotEnoughHashesError), e:
471 hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())])
472 log.msg(format="hash failure in block_hashes=(%(hashnums)s),"
474 hashnums=hashnums, shnum=self._shnum, share=repr(self),
475 failure=f, level=log.WEIRD, parent=self._lp, umid="yNyFdA")
476 hsize = max(0, max(needed_hashes)) * HASH_SIZE
477 self._signal_corruption(f, o_bh, hsize)
478 self.had_corruption = True
480 for hashnum in needed_hashes:
481 self._received.remove(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
484 def _satisfy_ciphertext_hash_tree(self, needed_hashes):
485 start = self.actual_offsets["crypttext_hash_tree"]
487 for hashnum in needed_hashes:
488 hashdata = self._received.get(start+hashnum*HASH_SIZE, HASH_SIZE)
490 hashes[hashnum] = hashdata
492 return False # missing some hashes
493 # we don't submit any hashes to the ciphertext_hash_tree until we've
496 self._node.process_ciphertext_hashes(hashes)
497 except (BadHashError, NotEnoughHashesError), e:
499 hashnums = ",".join([str(n) for n in sorted(hashes.keys())])
500 log.msg(format="hash failure in ciphertext_hashes=(%(hashnums)s),"
502 hashnums=hashnums, share=repr(self), failure=f,
503 level=log.WEIRD, parent=self._lp, umid="iZI0TA")
504 hsize = max(0, max(needed_hashes))*HASH_SIZE
505 self._signal_corruption(f, start, hsize)
506 self.had_corruption = True
508 for hashnum in needed_hashes:
509 self._received.remove(start+hashnum*HASH_SIZE, HASH_SIZE)
512 def _satisfy_data_block(self, segnum, observers):
513 tail = (segnum == self._node.num_segments-1)
514 datastart = self.actual_offsets["data"]
515 blockstart = datastart + segnum * self._node.block_size
516 blocklen = self._node.block_size
518 blocklen = self._node.tail_block_size
520 block = self._received.pop(blockstart, blocklen)
522 log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
523 blockstart, blocklen),
524 level=log.NOISY, parent=self._lp, umid="aK0RFw")
526 log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
527 share=repr(self), start=blockstart, length=blocklen,
528 level=log.NOISY, parent=self._lp, umid="uTDNZg")
529 # this block is being retired, either as COMPLETE or CORRUPT, since
530 # no further data reads will help
531 assert self._requested_blocks[0][0] == segnum
533 self._commonshare.check_block(segnum, block)
534 # hurrah, we have a valid block. Deliver it.
536 # goes to SegmentFetcher._block_request_activity
537 o.notify(state=COMPLETE, block=block)
538 # now clear our received data, to dodge the #1170 spans.py
540 self._received = DataSpans()
541 except (BadHashError, NotEnoughHashesError), e:
542 # rats, we have a corrupt block. Notify our clients that they
543 # need to look elsewhere, and advise the server. Unlike
544 # corruption in other parts of the share, this doesn't cause us
545 # to abandon the whole share.
547 log.msg(format="hash failure in block %(segnum)d, from %(share)s",
548 segnum=segnum, share=repr(self), failure=f,
549 level=log.WEIRD, parent=self._lp, umid="mZjkqA")
551 o.notify(state=CORRUPT)
552 self._signal_corruption(f, blockstart, blocklen)
553 self.had_corruption = True
554 # in either case, we've retired this block
555 self._requested_blocks.pop(0)
556 # popping the request keeps us from turning around and wanting the
557 # block again right away
558 return True # got satisfaction
561 segnum, observers = self._active_segnum_and_observers() # maybe None
563 # 'want_it' is for data we merely want: we know that we don't really
564 # need it. This includes speculative reads, like the first 1KB of the
565 # share (for the offset table) and the first 2KB of the UEB.
567 # 'need_it' is for data that, if we have the real offset table, we'll
568 # need. If we are only guessing at the offset table, it's merely
569 # wanted. (The share is abandoned if we can't get data that we really
572 # 'gotta_gotta_have_it' is for data that we absolutely need,
573 # independent of whether we're still guessing about the offset table:
574 # the version number and the offset table itself.
576 # Mr. Popeil, I'm in trouble, need your assistance on the double. Aww..
578 desire = Spans(), Spans(), Spans()
579 (want_it, need_it, gotta_gotta_have_it) = desire
581 self.actual_segment_size = self._node.segment_size # might be updated
582 o = self.actual_offsets or self.guessed_offsets
583 segsize = self.actual_segment_size or self.guessed_segment_size
584 r = self._node._calculate_sizes(segsize)
586 if not self.actual_offsets:
587 # all _desire functions add bits to the three desire[] spans
588 self._desire_offsets(desire)
590 # we can use guessed offsets as long as this server tolerates
591 # overrun. Otherwise, we must wait for the offsets to arrive before
592 # we try to read anything else.
593 if self.actual_offsets or self._overrun_ok:
594 if not self._node.have_UEB:
595 self._desire_UEB(desire, o)
596 self._desire_share_hashes(desire, o)
597 if segnum is not None:
598 # They might be asking for a segment number that is beyond
599 # what we guess the file contains, but _desire_block_hashes
600 # and _desire_data will tolerate that.
601 self._desire_block_hashes(desire, o, segnum)
602 self._desire_data(desire, o, r, segnum, segsize)
604 log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
605 % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()),
606 level=log.NOISY, parent=self._lp, umid="IG7CgA")
607 if self.actual_offsets:
608 return (want_it, need_it+gotta_gotta_have_it)
610 return (want_it+need_it, gotta_gotta_have_it)
612 def _desire_offsets(self, desire):
613 (want_it, need_it, gotta_gotta_have_it) = desire
615 # easy! this includes version number, sizes, and offsets
619 # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44).
620 # To be conservative, only request the data that we know lives there,
621 # even if that means more roundtrips.
623 gotta_gotta_have_it.add(0, 4) # version number, always safe
624 version_s = self._received.get(0, 4)
627 (version,) = struct.unpack(">L", version_s)
628 # The code in _satisfy_offsets will have checked this version
629 # already. There is no code path to get this far with version>2.
630 assert 1 <= version <= 2, "can't get here, version=%d" % version
637 offset_table_size = 6 * fieldsize
638 gotta_gotta_have_it.add(table_start, offset_table_size)
640 def _desire_UEB(self, desire, o):
641 (want_it, need_it, gotta_gotta_have_it) = desire
643 # UEB data is stored as (length,data).
645 # We can pre-fetch 2kb, which should probably cover it. If it
646 # turns out to be larger, we'll come back here later with a known
647 # length and fetch the rest.
648 want_it.add(o["uri_extension"], 2048)
649 # now, while that is probably enough to fetch the whole UEB, it
650 # might not be, so we need to do the next few steps as well. In
651 # most cases, the following steps will not actually add anything
654 need_it.add(o["uri_extension"], self._fieldsize)
655 # only use a length if we're sure it's correct, otherwise we'll
656 # probably fetch a huge number
657 if not self.actual_offsets:
659 UEB_length_s = self._received.get(o["uri_extension"], self._fieldsize)
661 (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
662 # we know the length, so make sure we grab everything
663 need_it.add(o["uri_extension"]+self._fieldsize, UEB_length)
665 def _desire_share_hashes(self, desire, o):
666 (want_it, need_it, gotta_gotta_have_it) = desire
668 if self._node.share_hash_tree.needed_hashes(self._shnum):
669 hashlen = o["uri_extension"] - o["share_hashes"]
670 need_it.add(o["share_hashes"], hashlen)
672 def _desire_block_hashes(self, desire, o, segnum):
673 (want_it, need_it, gotta_gotta_have_it) = desire
676 for hashnum in self._commonshare.get_desired_block_hashes(segnum):
677 need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
679 # ciphertext hash chain
680 for hashnum in self._node.get_desired_ciphertext_hashes(segnum):
681 need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
683 def _desire_data(self, desire, o, r, segnum, segsize):
684 if segnum > r["num_segments"]:
685 # they're asking for a segment that's beyond what we think is the
686 # end of the file. We won't get here if we've already learned the
687 # real UEB: _get_satisfaction() will notice the out-of-bounds and
688 # terminate the loop. So we must still be guessing, which means
689 # that they might be correct in asking for such a large segnum.
690 # But if they're right, then our segsize/segnum guess is
691 # certainly wrong, which means we don't know what data blocks to
692 # ask for yet. So don't bother adding anything. When the UEB
693 # comes back and we learn the correct segsize/segnums, we'll
694 # either reject the request or have enough information to proceed
695 # normally. This costs one roundtrip.
696 log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)"
697 % (segnum, r["num_segments"]),
698 level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
700 (want_it, need_it, gotta_gotta_have_it) = desire
701 tail = (segnum == r["num_segments"]-1)
702 datastart = o["data"]
703 blockstart = datastart + segnum * r["block_size"]
704 blocklen = r["block_size"]
706 blocklen = r["tail_block_size"]
707 need_it.add(blockstart, blocklen)
709 def _send_requests(self, desired):
710 ask = desired - self._pending - self._received.get_spans()
711 log.msg("%s._send_requests, desired=%s, pending=%s, ask=%s" %
712 (repr(self), desired.dump(), self._pending.dump(), ask.dump()),
713 level=log.NOISY, parent=self._lp, umid="E94CVA")
714 # XXX At one time, this code distinguished between data blocks and
715 # hashes, and made sure to send (small) requests for hashes before
716 # sending (big) requests for blocks. The idea was to make sure that
717 # all hashes arrive before the blocks, so the blocks can be consumed
718 # and released in a single turn. I removed this for simplicity.
719 # Reconsider the removal: maybe bring it back.
720 ds = self._download_status
722 for (start, length) in ask:
723 # TODO: quantize to reasonably-large blocks
724 self._pending.add(start, length)
725 lp = log.msg(format="%(share)s._send_request"
726 " [%(start)d:+%(length)d]",
728 start=start, length=length,
729 level=log.NOISY, parent=self._lp, umid="sgVAyA")
730 req_ev = ds.add_request_sent(self._peerid, self._shnum,
731 start, length, now())
732 d = self._send_request(start, length)
733 d.addCallback(self._got_data, start, length, req_ev, lp)
734 d.addErrback(self._got_error, start, length, req_ev, lp)
735 d.addCallback(self._trigger_loop)
736 d.addErrback(lambda f:
737 log.err(format="unhandled error during send_request",
738 failure=f, parent=self._lp,
739 level=log.WEIRD, umid="qZu0wg"))
741 def _send_request(self, start, length):
742 return self._rref.callRemote("read", start, length)
744 def _got_data(self, data, start, length, req_ev, lp):
745 req_ev.finished(len(data), now())
748 log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d",
749 share=repr(self), start=start, length=length, datalen=len(data),
750 level=log.NOISY, parent=lp, umid="5Qn6VQ")
751 self._pending.remove(start, length)
752 self._received.add(start, data)
754 # if we ask for [a:c], and we get back [a:b] (b<c), that means we're
755 # never going to get [b:c]. If we really need that data, this block
756 # will never complete. The easiest way to get into this situation is
757 # to hit a share with a corrupted offset table, or one that's somehow
758 # been truncated. On the other hand, when overrun_ok is true, we ask
759 # for data beyond the end of the share all the time (it saves some
760 # RTT when we don't know the length of the share ahead of time). So
761 # not every asked-for-but-not-received byte is fatal.
762 if len(data) < length:
763 self._unavailable.add(start+len(data), length-len(data))
765 # XXX if table corruption causes our sections to overlap, then one
766 # consumer (i.e. block hash tree) will pop/remove the data that
767 # another consumer (i.e. block data) mistakenly thinks it needs. It
768 # won't ask for that data again, because the span is in
769 # self._requested. But that span won't be in self._unavailable
770 # because we got it back from the server. TODO: handle this properly
771 # (raise DataUnavailable). Then add sanity-checking
772 # no-overlaps-allowed tests to the offset-table unpacking code to
773 # catch this earlier. XXX
775 # accumulate a wanted/needed span (not as self._x, but passed into
776 # desire* functions). manage a pending/in-flight list. when the
777 # requests are sent out, empty/discard the wanted/needed span and
778 # populate/augment the pending list. when the responses come back,
779 # augment either received+data or unavailable.
781 # if a corrupt offset table results in double-usage, we'll send
784 # the wanted/needed span is only "wanted" for the first pass. Once
785 # the offset table arrives, it's all "needed".
787 def _got_error(self, f, start, length, req_ev, lp):
788 req_ev.finished("error", now())
789 log.msg(format="error requesting %(start)d+%(length)d"
790 " from %(server)s for si %(si)s",
791 start=start, length=length,
792 server=self._peerid_s, si=self._si_prefix,
793 failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw")
794 # retire our observers, assuming we won't be able to make any
796 self._fail(f, log.UNUSUAL)
798 def _trigger_loop(self, res):
800 eventually(self.loop)
803 def _fail(self, f, level=log.WEIRD):
804 log.msg(format="abandoning %(share)s",
805 share=repr(self), failure=f,
806 level=level, parent=self._lp, umid="JKM2Og")
808 for (segnum, observers) in self._requested_blocks:
810 o.notify(state=DEAD, f=f)
814 # TODO: defer creation of the hashtree until somebody uses us. There will
815 # be a lot of unused shares, and we shouldn't spend the memory on a large
816 # hashtree unless necessary.
817 """I hold data that is common across all instances of a single share,
818 like sh2 on both servers A and B. This is just the block hash tree.
820 def __init__(self, best_numsegs, si_prefix, shnum, logparent):
821 self.si_prefix = si_prefix
824 # in the beginning, before we have the real UEB, we can only guess at
825 # the number of segments. But we want to ask for block hashes early.
826 # So if we're asked for which block hashes are needed before we know
827 # numsegs for sure, we return a guess.
828 self._block_hash_tree = IncompleteHashTree(best_numsegs)
829 self._block_hash_tree_is_authoritative = False
830 self._block_hash_tree_leaves = best_numsegs
831 self._logparent = logparent
834 return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum)
836 def set_authoritative_num_segments(self, numsegs):
837 if self._block_hash_tree_leaves != numsegs:
838 self._block_hash_tree = IncompleteHashTree(numsegs)
839 self._block_hash_tree_leaves = numsegs
840 self._block_hash_tree_is_authoritative = True
842 def need_block_hash_root(self):
843 return bool(not self._block_hash_tree[0])
845 def set_block_hash_root(self, roothash):
846 assert self._block_hash_tree_is_authoritative
847 self._block_hash_tree.set_hashes({0: roothash})
849 def get_desired_block_hashes(self, segnum):
850 if segnum < self._block_hash_tree_leaves:
851 return self._block_hash_tree.needed_hashes(segnum,
854 # the segnum might be out-of-bounds. Originally it was due to a race
855 # between the receipt of the UEB on one share (from which we learn
856 # the correct number of segments, update all hash trees to the right
857 # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery
858 # of a new Share to the SegmentFetcher while that BADSEGNUM was
859 # queued (which sends out requests to the stale segnum, now larger
860 # than the hash tree). I fixed that (by making SegmentFetcher.loop
861 # check for a bad segnum at the start of each pass, instead of using
862 # the queued BADSEGNUM or a flag it sets), but just in case this
863 # still happens, I'm leaving the < in place. If it gets hit, there's
864 # a potential lost-progress problem, but I'm pretty sure that it will
865 # get cleared up on the following turn.
868 def get_needed_block_hashes(self, segnum):
869 assert self._block_hash_tree_is_authoritative
870 # XXX: include_leaf=True needs thought: how did the old downloader do
871 # it? I think it grabbed *all* block hashes and set them all at once.
872 # Since we want to fetch less data, we either need to fetch the leaf
873 # too, or wait to set the block hashes until we've also received the
874 # block itself, so we can hash it too, and set the chain+leaf all at
876 return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
878 def process_block_hashes(self, block_hashes):
879 assert self._block_hash_tree_is_authoritative
880 # this may raise BadHashError or NotEnoughHashesError
881 self._block_hash_tree.set_hashes(block_hashes)
883 def check_block(self, segnum, block):
884 assert self._block_hash_tree_is_authoritative
885 h = hashutil.block_hash(block)
886 # this may raise BadHashError or NotEnoughHashesError
887 self._block_hash_tree.set_hashes(leaves={segnum: h})
889 # TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an
890 # auxilliary OVERDUE callback. Just make sure to get all the messages in the
891 # right order and on the right turns.
893 # TODO: we're asking for too much data. We probably don't need
894 # include_leaf=True in the block hash tree or ciphertext hash tree.
896 # TODO: we ask for ciphertext hash tree nodes from all shares (whenever
897 # _desire is called while we're missing those nodes), but we only consume it
898 # from the first response, leaving the rest of the data sitting in _received.
899 # This was ameliorated by clearing self._received after each block is