From 00e9e4e6760021a16b17e91934e481fee108c6a2 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 31 Aug 2010 18:37:02 -0700 Subject: [PATCH] SegmentFetcher: use new diversity-seeking share-selection algorithm, and deliver all shares at once instead of feeding them out one-at-a-time. Also fix distribution of real-number-of-segments information: now all CommonShares (not just the ones used for the first segment) get a correctly-sized hashtree. Previously, the late ones might not, which would make them crash and get dropped (causing the download to fail if the initial set were insufficient, perhaps because one of their servers went away). Update tests, add some TODO notes, improve variable names and comments. Improve logging: add logparents, set more appropriate levels. --- src/allmydata/immutable/downloader/fetcher.py | 291 ++++++++------- src/allmydata/immutable/downloader/finder.py | 82 +++-- src/allmydata/immutable/downloader/node.py | 57 ++- src/allmydata/immutable/downloader/share.py | 137 ++++--- src/allmydata/test/test_cli.py | 4 +- src/allmydata/test/test_download.py | 342 +++++++++++++++++- src/allmydata/test/test_immutable.py | 2 +- src/allmydata/test/test_web.py | 23 +- 8 files changed, 691 insertions(+), 247 deletions(-) diff --git a/src/allmydata/immutable/downloader/fetcher.py b/src/allmydata/immutable/downloader/fetcher.py index e30ced85..e78d37e7 100644 --- a/src/allmydata/immutable/downloader/fetcher.py +++ b/src/allmydata/immutable/downloader/fetcher.py @@ -4,8 +4,8 @@ from foolscap.api import eventually from allmydata.interfaces import NotEnoughSharesError, NoSharesError from allmydata.util import log from allmydata.util.dictutil import DictOfSets -from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \ - BADSEGNUM, BadSegmentNumberError +from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \ + BadSegmentNumberError class SegmentFetcher: """I am responsible for acquiring blocks for a single segment. I will use @@ -22,35 +22,42 @@ class SegmentFetcher: will shut down and do no further work. My parent can also call my stop() method to have me shut down early.""" - def __init__(self, node, segnum, k): + def __init__(self, node, segnum, k, logparent): self._node = node # _Node self.segnum = segnum self._k = k - self._shares = {} # maps non-dead Share instance to a state, one of - # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT). - # State transition map is: - # AVAILABLE -(send-read)-> PENDING - # PENDING -(timer)-> OVERDUE - # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM - # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM - # If a share becomes DEAD, it is removed from the - # dict. If it becomes BADSEGNUM, the whole fetch is - # terminated. + self._shares = [] # unused Share instances, sorted by "goodness" + # (RTT), then shnum. This is populated when DYHB + # responses arrive, or (for later segments) at + # startup. We remove shares from it when we call + # sh.get_block() on them. + self._shares_from_server = DictOfSets() # maps serverid to set of + # Shares on that server for + # which we have outstanding + # get_block() calls. + self._max_shares_per_server = 1 # how many Shares we're allowed to + # pull from each server. This starts + # at 1 and grows if we don't have + # sufficient diversity. + self._active_share_map = {} # maps shnum to outstanding (and not + # OVERDUE) Share that provides it. + self._overdue_share_map = DictOfSets() # shares in the OVERDUE state + self._lp = logparent self._share_observers = {} # maps Share to EventStreamObserver for # active ones - self._shnums = DictOfSets() # maps shnum to the shares that provide it self._blocks = {} # maps shnum to validated block data self._no_more_shares = False - self._bad_segnum = False self._last_failure = None self._running = True def stop(self): log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix, - level=log.NOISY, umid="LWyqpg") + level=log.NOISY, parent=self._lp, umid="LWyqpg") self._cancel_all_requests() self._running = False - self._shares.clear() # let GC work # ??? XXX + # help GC ??? XXX + del self._shares, self._shares_from_server, self._active_share_map + del self._share_observers # called by our parent _Node @@ -59,9 +66,8 @@ class SegmentFetcher: # called when ShareFinder locates a new share, and when a non-initial # segment fetch is started and we already know about shares from the # previous segment - for s in shares: - self._shares[s] = AVAILABLE - self._shnums.add(s._shnum, s) + self._shares.extend(shares) + self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) ) eventually(self.loop) def no_more_shares(self): @@ -71,15 +77,6 @@ class SegmentFetcher: # internal methods - def _count_shnums(self, *states): - """shnums for which at least one state is in the following list""" - shnums = [] - for shnum,shares in self._shnums.iteritems(): - matches = [s for s in shares if self._shares.get(s) in states] - if matches: - shnums.append(shnum) - return len(shnums) - def loop(self): try: # if any exception occurs here, kill the download @@ -92,7 +89,8 @@ class SegmentFetcher: k = self._k if not self._running: return - if self._bad_segnum: + numsegs, authoritative = self._node.get_num_segments() + if authoritative and self.segnum >= numsegs: # oops, we were asking for a segment number beyond the end of the # file. This is an error. self.stop() @@ -102,98 +100,125 @@ class SegmentFetcher: self._node.fetch_failed(self, f) return + #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares + # Should we sent out more requests? + while len(set(self._blocks.keys()) + | set(self._active_share_map.keys()) + ) < k: + # we don't have data or active requests for enough shares. Are + # there any unused shares we can start using? + (sent_something, want_more_diversity) = self._find_and_use_share() + if sent_something: + # great. loop back around in case we need to send more. + continue + if want_more_diversity: + # we could have sent something if we'd been allowed to pull + # more shares per server. Increase the limit and try again. + self._max_shares_per_server += 1 + log.msg("SegmentFetcher(%s) increasing diversity limit to %d" + % (self._node._si_prefix, self._max_shares_per_server), + level=log.NOISY, umid="xY2pBA") + # Also ask for more shares, in the hopes of achieving better + # diversity for the next segment. + self._ask_for_more_shares() + continue + # we need more shares than the ones in self._shares to make + # progress + self._ask_for_more_shares() + if self._no_more_shares: + # But there are no more shares to be had. If we're going to + # succeed, it will be with the shares we've already seen. + # Will they be enough? + if len(set(self._blocks.keys()) + | set(self._active_share_map.keys()) + | set(self._overdue_share_map.keys()) + ) < k: + # nope. bail. + self._no_shares_error() # this calls self.stop() + return + # our outstanding or overdue requests may yet work. + # more shares may be coming. Wait until then. + return + # are we done? - if self._count_shnums(COMPLETE) >= k: + if len(set(self._blocks.keys())) >= k: # yay! self.stop() self._node.process_blocks(self.segnum, self._blocks) return - # we may have exhausted everything - if (self._no_more_shares and - self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k): - # no more new shares are coming, and the remaining hopeful shares - # aren't going to be enough. boo! - - log.msg("share states: %r" % (self._shares,), - level=log.NOISY, umid="0ThykQ") - if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0: - format = ("no shares (need %(k)d)." - " Last failure: %(last_failure)s") - args = { "k": k, - "last_failure": self._last_failure } - error = NoSharesError - else: - format = ("ran out of shares: %(complete)d complete," - " %(pending)d pending, %(overdue)d overdue," - " %(unused)d unused, need %(k)d." - " Last failure: %(last_failure)s") - args = {"complete": self._count_shnums(COMPLETE), - "pending": self._count_shnums(PENDING), - "overdue": self._count_shnums(OVERDUE), - # 'unused' should be zero - "unused": self._count_shnums(AVAILABLE), - "k": k, - "last_failure": self._last_failure, - } - error = NotEnoughSharesError - log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args) - e = error(format % args) - f = Failure(e) - self.stop() - self._node.fetch_failed(self, f) - return + def _no_shares_error(self): + if not (self._shares or self._active_share_map or + self._overdue_share_map or self._blocks): + format = ("no shares (need %(k)d)." + " Last failure: %(last_failure)s") + args = { "k": self._k, + "last_failure": self._last_failure } + error = NoSharesError + else: + format = ("ran out of shares: complete=%(complete)s" + " pending=%(pending)s overdue=%(overdue)s" + " unused=%(unused)s need %(k)d." + " Last failure: %(last_failure)s") + def join(shnums): return ",".join(["sh%d" % shnum + for shnum in sorted(shnums)]) + pending_s = ",".join([str(sh) + for sh in self._active_share_map.values()]) + overdue = set() + for shares in self._overdue_share_map.values(): + overdue |= shares + overdue_s = ",".join([str(sh) for sh in overdue]) + args = {"complete": join(self._blocks.keys()), + "pending": pending_s, + "overdue": overdue_s, + # 'unused' should be zero + "unused": ",".join([str(sh) for sh in self._shares]), + "k": self._k, + "last_failure": self._last_failure, + } + error = NotEnoughSharesError + log.msg(format=format, + level=log.UNUSUAL, parent=self._lp, umid="1DsnTg", + **args) + e = error(format % args) + f = Failure(e) + self.stop() + self._node.fetch_failed(self, f) - # nope, not done. Are we "block-hungry" (i.e. do we want to send out - # more read requests, or do we think we have enough in flight - # already?) - while self._count_shnums(PENDING, COMPLETE) < k: - # we're hungry.. are there any unused shares? - sent = self._send_new_request() - if not sent: - break - - # ok, now are we "share-hungry" (i.e. do we have enough known shares - # to make us happy, or should we ask the ShareFinder to get us more?) - if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k: - # we're hungry for more shares - self._node.want_more_shares() - # that will trigger the ShareFinder to keep looking - - def _find_one(self, shares, state): - # TODO could choose fastest, or avoid servers already in use - for s in shares: - if self._shares[s] == state: - return s - # can never get here, caller has assert in case of code bug - - def _send_new_request(self): - # TODO: this is probably O(k^2), and we're called from a range(k) - # loop, so O(k^3) - - # this first loop prefers sh0, then sh1, sh2, etc - for shnum,shares in sorted(self._shnums.iteritems()): - states = [self._shares[s] for s in shares] - if COMPLETE in states or PENDING in states: - # don't send redundant requests + def _find_and_use_share(self): + sent_something = False + want_more_diversity = False + for sh in self._shares: # find one good share to fetch + shnum = sh._shnum ; serverid = sh._peerid + if shnum in self._blocks: + continue # don't request data we already have + if shnum in self._active_share_map: + # note: OVERDUE shares are removed from _active_share_map + # and added to _overdue_share_map instead. + continue # don't send redundant requests + sfs = self._shares_from_server + if len(sfs.get(serverid,set())) >= self._max_shares_per_server: + # don't pull too much from a single server + want_more_diversity = True continue - if AVAILABLE not in states: - # no candidates for this shnum, move on - continue - # here's a candidate. Send a request. - s = self._find_one(shares, AVAILABLE) - assert s - self._shares[s] = PENDING - self._share_observers[s] = o = s.get_block(self.segnum) - o.subscribe(self._block_request_activity, share=s, shnum=shnum) - # TODO: build up a list of candidates, then walk through the - # list, sending requests to the most desireable servers, - # re-checking our block-hunger each time. For non-initial segment - # fetches, this would let us stick with faster servers. - return True - # nothing was sent: don't call us again until you have more shares to - # work with, or one of the existing shares has been declared OVERDUE - return False + # ok, we can use this share + self._shares.remove(sh) + self._active_share_map[shnum] = sh + self._shares_from_server.add(serverid, sh) + self._start_share(sh, shnum) + sent_something = True + break + return (sent_something, want_more_diversity) + + def _start_share(self, share, shnum): + self._share_observers[share] = o = share.get_block(self.segnum) + o.subscribe(self._block_request_activity, share=share, shnum=shnum) + + def _ask_for_more_shares(self): + if not self._no_more_shares: + self._node.want_more_shares() + # that will trigger the ShareFinder to keep looking, and call our + # add_shares() or no_more_shares() later. def _cancel_all_requests(self): for o in self._share_observers.values(): @@ -207,27 +232,33 @@ class SegmentFetcher: log.msg("SegmentFetcher(%s)._block_request_activity:" " Share(sh%d-on-%s) -> %s" % (self._node._si_prefix, shnum, share._peerid_s, state), - level=log.NOISY, umid="vilNWA") - # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. + level=log.NOISY, parent=self._lp, umid="vilNWA") + # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share + # from all our tracking lists. if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): self._share_observers.pop(share, None) + self._shares_from_server.discard(shnum, share) + if self._active_share_map.get(shnum) is share: + del self._active_share_map[shnum] + self._overdue_share_map.discard(shnum, share) + if state is COMPLETE: - # 'block' is fully validated - self._shares[share] = COMPLETE + # 'block' is fully validated and complete self._blocks[shnum] = block - elif state is OVERDUE: - self._shares[share] = OVERDUE + + if state is OVERDUE: + # no longer active, but still might complete + del self._active_share_map[shnum] + self._overdue_share_map.add(shnum, share) # OVERDUE is not terminal: it will eventually transition to # COMPLETE, CORRUPT, or DEAD. - elif state is CORRUPT: - self._shares[share] = CORRUPT - elif state is DEAD: - del self._shares[share] - self._shnums[shnum].remove(share) - self._last_failure = f - elif state is BADSEGNUM: - self._shares[share] = BADSEGNUM # ??? - self._bad_segnum = True - eventually(self.loop) + if state is DEAD: + self._last_failure = f + if state is BADSEGNUM: + # our main loop will ask the DownloadNode each time for the + # number of segments, so we'll deal with this in the top of + # _do_loop + pass + eventually(self.loop) diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py index 9adee99c..fa6204c7 100644 --- a/src/allmydata/immutable/downloader/finder.py +++ b/src/allmydata/immutable/downloader/finder.py @@ -35,11 +35,9 @@ class ShareFinder: self._storage_broker = storage_broker self.share_consumer = self.node = node self.max_outstanding_requests = max_outstanding_requests - self._hungry = False self._commonshares = {} # shnum to CommonShare instance - self.undelivered_shares = [] self.pending_requests = set() self.overdue_requests = set() # subset of pending_requests self.overdue_timers = {} @@ -52,6 +50,12 @@ class ShareFinder: si=self._si_prefix, level=log.NOISY, parent=logparent, umid="2xjj2A") + def update_num_segments(self): + (numsegs, authoritative) = self.node.get_num_segments() + assert authoritative + for cs in self._commonshares.values(): + cs.set_authoritative_num_segments(numsegs) + def start_finding_servers(self): # don't get servers until somebody uses us: creating the # ImmutableFileNode should not cause work to happen yet. Test case is @@ -83,30 +87,16 @@ class ShareFinder: # internal methods def loop(self): - undelivered_s = ",".join(["sh%d@%s" % - (s._shnum, idlib.shortnodeid_b2a(s._peerid)) - for s in self.undelivered_shares]) pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid) for rt in self.pending_requests]) # sort? self.log(format="ShareFinder loop: running=%(running)s" - " hungry=%(hungry)s, undelivered=%(undelivered)s," - " pending=%(pending)s", - running=self.running, hungry=self._hungry, - undelivered=undelivered_s, pending=pending_s, + " hungry=%(hungry)s, pending=%(pending)s", + running=self.running, hungry=self._hungry, pending=pending_s, level=log.NOISY, umid="kRtS4Q") if not self.running: return if not self._hungry: return - if self.undelivered_shares: - sh = self.undelivered_shares.pop(0) - # they will call hungry() again if they want more - self._hungry = False - self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)", - shnum=sh._shnum, peerid=sh._peerid_s, - level=log.NOISY, umid="2n1qQw") - eventually(self.share_consumer.got_shares, [sh]) - return non_overdue = self.pending_requests - self.overdue_requests if len(non_overdue) >= self.max_outstanding_requests: @@ -146,14 +136,16 @@ class ShareFinder: lp = self.log(format="sending DYHB to [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, umid="Io7pyg") - d_ev = self._download_status.add_dyhb_sent(peerid, now()) + time_sent = now() + d_ev = self._download_status.add_dyhb_sent(peerid, time_sent) # TODO: get the timer from a Server object, it knows best self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, self.overdue, req) d = rref.callRemote("get_buckets", self._storage_index) d.addBoth(incidentally, self._request_retired, req) d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(rref.version, peerid, req, d_ev, lp), + callbackArgs=(rref.version, peerid, req, d_ev, + time_sent, lp), errbackArgs=(peerid, req, d_ev, lp)) d.addErrback(log.err, format="error in send_request", level=log.WEIRD, parent=lp, umid="rpdV0w") @@ -172,33 +164,37 @@ class ShareFinder: self.overdue_requests.add(req) eventually(self.loop) - def _got_response(self, buckets, server_version, peerid, req, d_ev, lp): + def _got_response(self, buckets, server_version, peerid, req, d_ev, + time_sent, lp): shnums = sorted([shnum for shnum in buckets]) - d_ev.finished(shnums, now()) - if buckets: - shnums_s = ",".join([str(shnum) for shnum in shnums]) - self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", - shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), - level=log.NOISY, parent=lp, umid="0fcEZw") - else: + time_received = now() + d_ev.finished(shnums, time_received) + dyhb_rtt = time_received - time_sent + if not buckets: self.log(format="no shares from [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, parent=lp, umid="U7d4JA") - if self.node.num_segments is None: - best_numsegs = self.node.guessed_num_segments - else: - best_numsegs = self.node.num_segments + return + shnums_s = ",".join([str(shnum) for shnum in shnums]) + self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", + shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), + level=log.NOISY, parent=lp, umid="0fcEZw") + shares = [] for shnum, bucket in buckets.iteritems(): - self._create_share(best_numsegs, shnum, bucket, server_version, - peerid) + s = self._create_share(shnum, bucket, server_version, peerid, + dyhb_rtt) + shares.append(s) + self._deliver_shares(shares) - def _create_share(self, best_numsegs, shnum, bucket, server_version, - peerid): + def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt): if shnum in self._commonshares: cs = self._commonshares[shnum] else: - cs = CommonShare(best_numsegs, self._si_prefix, shnum, + numsegs, authoritative = self.node.get_num_segments() + cs = CommonShare(numsegs, self._si_prefix, shnum, self._node_logparent) + if authoritative: + cs.set_authoritative_num_segments(numsegs) # Share._get_satisfaction is responsible for updating # CommonShare.set_numsegs after we know the UEB. Alternatives: # 1: d = self.node.get_num_segments() @@ -214,9 +210,17 @@ class ShareFinder: # Yuck. self._commonshares[shnum] = cs s = Share(bucket, server_version, self.verifycap, cs, self.node, - self._download_status, peerid, shnum, + self._download_status, peerid, shnum, dyhb_rtt, self._node_logparent) - self.undelivered_shares.append(s) + return s + + def _deliver_shares(self, shares): + # they will call hungry() again if they want more + self._hungry = False + shares_s = ",".join([str(sh) for sh in shares]) + self.log(format="delivering shares: %s" % shares_s, + level=log.NOISY, umid="2n1qQw") + eventually(self.share_consumer.got_shares, shares) def _got_error(self, f, peerid, req, d_ev, lp): d_ev.finished("error", now()) diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py index 4c92dd84..33c16cfa 100644 --- a/src/allmydata/immutable/downloader/node.py +++ b/src/allmydata/immutable/downloader/node.py @@ -72,7 +72,7 @@ class DownloadNode: # things to track callers that want data # _segment_requests can have duplicates - self._segment_requests = [] # (segnum, d, cancel_handle) + self._segment_requests = [] # (segnum, d, cancel_handle, logparent) self._active_segment = None # a SegmentFetcher, with .segnum self._segsize_observers = observer.OneShotObserverList() @@ -81,7 +81,8 @@ class DownloadNode: # for each read() call. Segmentation and get_segment() messages are # associated with the read() call, everything else is tied to the # _Node's log entry. - lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d," + lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:" + " size=%(size)d," " guessed_segsize=%(guessed_segsize)d," " guessed_numsegs=%(guessed_numsegs)d", si=self._si_prefix, size=verifycap.size, @@ -103,9 +104,10 @@ class DownloadNode: # as with CommonShare, our ciphertext_hash_tree is a stub until we # get the real num_segments self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments) + self.ciphertext_hash_tree_leaves = self.guessed_num_segments def __repr__(self): - return "Imm_Node(%s)" % (self._si_prefix,) + return "ImmutableDownloadNode(%s)" % (self._si_prefix,) def stop(self): # called by the Terminator at shutdown, mostly for tests @@ -175,14 +177,14 @@ class DownloadNode: The Deferred can also errback with other fatal problems, such as NotEnoughSharesError, NoSharesError, or BadCiphertextHashError. """ - log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)", - si=base32.b2a(self._verifycap.storage_index)[:8], - segnum=segnum, - level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") + lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)", + si=base32.b2a(self._verifycap.storage_index)[:8], + segnum=segnum, + level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") self._download_status.add_segment_request(segnum, now()) d = defer.Deferred() c = Cancel(self._cancel_request) - self._segment_requests.append( (segnum, d, c) ) + self._segment_requests.append( (segnum, d, c, lp) ) self._start_new_segment() return (d, c) @@ -208,10 +210,11 @@ class DownloadNode: if self._active_segment is None and self._segment_requests: segnum = self._segment_requests[0][0] k = self._verifycap.needed_shares + lp = self._segment_requests[0][3] log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d", node=repr(self), segnum=segnum, - level=log.NOISY, umid="wAlnHQ") - self._active_segment = fetcher = SegmentFetcher(self, segnum, k) + level=log.NOISY, parent=lp, umid="wAlnHQ") + self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp) active_shares = [s for s in self._shares if s.is_alive()] fetcher.add_shares(active_shares) # this triggers the loop @@ -234,13 +237,17 @@ class DownloadNode: h = hashutil.uri_extension_hash(UEB_s) if h != self._verifycap.uri_extension_hash: raise BadHashError - UEB_dict = uri.unpack_extension(UEB_s) - self._parse_and_store_UEB(UEB_dict) # sets self._stuff + self._parse_and_store_UEB(UEB_s) # sets self._stuff # TODO: a malformed (but authentic) UEB could throw an assertion in # _parse_and_store_UEB, and we should abandon the download. self.have_UEB = True - def _parse_and_store_UEB(self, d): + # inform the ShareFinder about our correct number of segments. This + # will update the block-hash-trees in all existing CommonShare + # instances, and will populate new ones with the correct value. + self._sharefinder.update_num_segments() + + def _parse_and_store_UEB(self, UEB_s): # Note: the UEB contains needed_shares and total_shares. These are # redundant and inferior (the filecap contains the authoritative # values). However, because it is possible to encode the same file in @@ -252,8 +259,11 @@ class DownloadNode: # therefore, we ignore d['total_shares'] and d['needed_shares']. + d = uri.unpack_extension(UEB_s) + log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s", - ueb=repr(d), vcap=self._verifycap.to_string(), + ueb=repr(uri.unpack_extension_readable(UEB_s)), + vcap=self._verifycap.to_string(), level=log.NOISY, parent=self._lp, umid="cVqZnA") k, N = self._verifycap.needed_shares, self._verifycap.total_shares @@ -292,6 +302,7 @@ class DownloadNode: # shares of file B. self.ciphertext_hash_tree was a guess before: # this is where we create it for real. self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) + self.ciphertext_hash_tree_leaves = self.num_segments self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) self.share_hash_tree.set_hashes({0: d['share_root_hash']}) @@ -344,9 +355,15 @@ class DownloadNode: % (hashnum, len(self.share_hash_tree))) self.share_hash_tree.set_hashes(share_hashes) + def get_desired_ciphertext_hashes(self, segnum): + if segnum < self.ciphertext_hash_tree_leaves: + return self.ciphertext_hash_tree.needed_hashes(segnum, + include_leaf=True) + return [] def get_needed_ciphertext_hashes(self, segnum): cht = self.ciphertext_hash_tree return cht.needed_hashes(segnum, include_leaf=True) + def process_ciphertext_hashes(self, hashes): assert self.num_segments is not None # this may raise BadHashError or NotEnoughHashesError @@ -457,7 +474,7 @@ class DownloadNode: def _extract_requests(self, segnum): """Remove matching requests and return their (d,c) tuples so that the caller can retire them.""" - retire = [(d,c) for (segnum0, d, c) in self._segment_requests + retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests if segnum0 == segnum] self._segment_requests = [t for t in self._segment_requests if t[0] != segnum] @@ -466,10 +483,18 @@ class DownloadNode: def _cancel_request(self, c): self._segment_requests = [t for t in self._segment_requests if t[2] != c] - segnums = [segnum for (segnum,d,c) in self._segment_requests] + segnums = [segnum for (segnum,d,c,lp) in self._segment_requests] # self._active_segment might be None in rare circumstances, so make # sure we tolerate it if self._active_segment and self._active_segment.segnum not in segnums: self._active_segment.stop() self._active_segment = None self._start_new_segment() + + # called by ShareFinder to choose hashtree sizes in CommonShares, and by + # SegmentFetcher to tell if it is still fetching a valid segnum. + def get_num_segments(self): + # returns (best_num_segments, authoritative) + if self.num_segments is None: + return (self.guessed_num_segments, False) + return (self.num_segments, True) diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py index 413f9077..78cce8ed 100644 --- a/src/allmydata/immutable/downloader/share.py +++ b/src/allmydata/immutable/downloader/share.py @@ -33,7 +33,7 @@ class Share: # servers. A different backend would use a different class. def __init__(self, rref, server_version, verifycap, commonshare, node, - download_status, peerid, shnum, logparent): + download_status, peerid, shnum, dyhb_rtt, logparent): self._rref = rref self._server_version = server_version self._node = node # holds share_hash_tree and UEB @@ -51,6 +51,7 @@ class Share: self._storage_index = verifycap.storage_index self._si_prefix = base32.b2a(verifycap.storage_index)[:8] self._shnum = shnum + self._dyhb_rtt = dyhb_rtt # self._alive becomes False upon fatal corruption or server error self._alive = True self._lp = log.msg(format="%(share)s created", share=repr(self), @@ -278,15 +279,16 @@ class Share: if not self._satisfy_UEB(): # can't check any hashes without the UEB return False + # the call to _satisfy_UEB() will immediately set the + # authoritative num_segments in all our CommonShares. If we + # guessed wrong, we might stil be working on a bogus segnum + # (beyond the real range). We catch this and signal BADSEGNUM + # before invoking any further code that touches hashtrees. self.actual_segment_size = self._node.segment_size # might be updated assert self.actual_segment_size is not None - # knowing the UEB means knowing num_segments. Despite the redundancy, - # this is the best place to set this. CommonShare.set_numsegs will - # ignore duplicate calls. + # knowing the UEB means knowing num_segments assert self._node.num_segments is not None - cs = self._commonshare - cs.set_numsegs(self._node.num_segments) segnum, observers = self._active_segnum_and_observers() # if segnum is None, we don't really need to do anything (we have no @@ -304,9 +306,9 @@ class Share: # can't check block_hash_tree without a root return False - if cs.need_block_hash_root(): + if self._commonshare.need_block_hash_root(): block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum) - cs.set_block_hash_root(block_hash_root) + self._commonshare.set_block_hash_root(block_hash_root) if segnum is None: return False # we don't want any particular segment right now @@ -360,7 +362,8 @@ class Share: ] ): offsets[field] = fields[i] self.actual_offsets = offsets - log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields)) + log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields), + level=log.NOISY, parent=self._lp, umid="jedQcw") self._received.remove(0, 4) # don't need this anymore # validate the offsets a bit @@ -517,7 +520,8 @@ class Share: block = self._received.pop(blockstart, blocklen) if not block: log.msg("no data for block %s (want [%d:+%d])" % (repr(self), - blockstart, blocklen)) + blockstart, blocklen), + level=log.NOISY, parent=self._lp, umid="aK0RFw") return False log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]", share=repr(self), start=blockstart, length=blocklen, @@ -589,29 +593,17 @@ class Share: if self.actual_offsets or self._overrun_ok: if not self._node.have_UEB: self._desire_UEB(desire, o) - # They might ask for a segment that doesn't look right. - # _satisfy() will catch+reject bad segnums once we know the UEB - # (and therefore segsize and numsegs), so we'll only fail this - # test if we're still guessing. We want to avoid asking the - # hashtrees for needed_hashes() for bad segnums. So don't enter - # _desire_hashes or _desire_data unless the segnum looks - # reasonable. - if segnum < r["num_segments"]: - # XXX somehow we're getting here for sh5. we don't yet know - # the actual_segment_size, we're still working off the guess. - # the ciphertext_hash_tree has been corrected, but the - # commonshare._block_hash_tree is still in the guessed state. - self._desire_share_hashes(desire, o) - if segnum is not None: - self._desire_block_hashes(desire, o, segnum) - self._desire_data(desire, o, r, segnum, segsize) - else: - log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)" - % (segnum, r["num_segments"]), - level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") + self._desire_share_hashes(desire, o) + if segnum is not None: + # They might be asking for a segment number that is beyond + # what we guess the file contains, but _desire_block_hashes + # and _desire_data will tolerate that. + self._desire_block_hashes(desire, o, segnum) + self._desire_data(desire, o, r, segnum, segsize) log.msg("end _desire: want_it=%s need_it=%s gotta=%s" - % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump())) + % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()), + level=log.NOISY, parent=self._lp, umid="IG7CgA") if self.actual_offsets: return (want_it, need_it+gotta_gotta_have_it) else: @@ -681,14 +673,30 @@ class Share: (want_it, need_it, gotta_gotta_have_it) = desire # block hash chain - for hashnum in self._commonshare.get_needed_block_hashes(segnum): + for hashnum in self._commonshare.get_desired_block_hashes(segnum): need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) # ciphertext hash chain - for hashnum in self._node.get_needed_ciphertext_hashes(segnum): + for hashnum in self._node.get_desired_ciphertext_hashes(segnum): need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE) def _desire_data(self, desire, o, r, segnum, segsize): + if segnum > r["num_segments"]: + # they're asking for a segment that's beyond what we think is the + # end of the file. We won't get here if we've already learned the + # real UEB: _get_satisfaction() will notice the out-of-bounds and + # terminate the loop. So we must still be guessing, which means + # that they might be correct in asking for such a large segnum. + # But if they're right, then our segsize/segnum guess is + # certainly wrong, which means we don't know what data blocks to + # ask for yet. So don't bother adding anything. When the UEB + # comes back and we learn the correct segsize/segnums, we'll + # either reject the request or have enough information to proceed + # normally. This costs one roundtrip. + log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)" + % (segnum, r["num_segments"]), + level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") + return (want_it, need_it, gotta_gotta_have_it) = desire tail = (segnum == r["num_segments"]-1) datastart = o["data"] @@ -803,34 +811,62 @@ class Share: class CommonShare: + # TODO: defer creation of the hashtree until somebody uses us. There will + # be a lot of unused shares, and we shouldn't spend the memory on a large + # hashtree unless necessary. """I hold data that is common across all instances of a single share, like sh2 on both servers A and B. This is just the block hash tree. """ - def __init__(self, guessed_numsegs, si_prefix, shnum, logparent): + def __init__(self, best_numsegs, si_prefix, shnum, logparent): self.si_prefix = si_prefix self.shnum = shnum + # in the beginning, before we have the real UEB, we can only guess at # the number of segments. But we want to ask for block hashes early. # So if we're asked for which block hashes are needed before we know # numsegs for sure, we return a guess. - self._block_hash_tree = IncompleteHashTree(guessed_numsegs) - self._know_numsegs = False + self._block_hash_tree = IncompleteHashTree(best_numsegs) + self._block_hash_tree_is_authoritative = False + self._block_hash_tree_leaves = best_numsegs self._logparent = logparent - def set_numsegs(self, numsegs): - if self._know_numsegs: - return - self._block_hash_tree = IncompleteHashTree(numsegs) - self._know_numsegs = True + def __repr__(self): + return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum) + + def set_authoritative_num_segments(self, numsegs): + if self._block_hash_tree_leaves != numsegs: + self._block_hash_tree = IncompleteHashTree(numsegs) + self._block_hash_tree_leaves = numsegs + self._block_hash_tree_is_authoritative = True def need_block_hash_root(self): return bool(not self._block_hash_tree[0]) def set_block_hash_root(self, roothash): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative self._block_hash_tree.set_hashes({0: roothash}) + def get_desired_block_hashes(self, segnum): + if segnum < self._block_hash_tree_leaves: + return self._block_hash_tree.needed_hashes(segnum, + include_leaf=True) + + # the segnum might be out-of-bounds. Originally it was due to a race + # between the receipt of the UEB on one share (from which we learn + # the correct number of segments, update all hash trees to the right + # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery + # of a new Share to the SegmentFetcher while that BADSEGNUM was + # queued (which sends out requests to the stale segnum, now larger + # than the hash tree). I fixed that (by making SegmentFetcher.loop + # check for a bad segnum at the start of each pass, instead of using + # the queued BADSEGNUM or a flag it sets), but just in case this + # still happens, I'm leaving the < in place. If it gets hit, there's + # a potential lost-progress problem, but I'm pretty sure that it will + # get cleared up on the following turn. + return [] + def get_needed_block_hashes(self, segnum): + assert self._block_hash_tree_is_authoritative # XXX: include_leaf=True needs thought: how did the old downloader do # it? I think it grabbed *all* block hashes and set them all at once. # Since we want to fetch less data, we either need to fetch the leaf @@ -840,12 +876,25 @@ class CommonShare: return self._block_hash_tree.needed_hashes(segnum, include_leaf=True) def process_block_hashes(self, block_hashes): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative # this may raise BadHashError or NotEnoughHashesError self._block_hash_tree.set_hashes(block_hashes) def check_block(self, segnum, block): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative h = hashutil.block_hash(block) # this may raise BadHashError or NotEnoughHashesError self._block_hash_tree.set_hashes(leaves={segnum: h}) + +# TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an +# auxilliary OVERDUE callback. Just make sure to get all the messages in the +# right order and on the right turns. + +# TODO: we're asking for too much data. We probably don't need +# include_leaf=True in the block hash tree or ciphertext hash tree. + +# TODO: we ask for ciphertext hash tree nodes from all shares (whenever +# _desire is called while we're missing those nodes), but we only consume it +# from the first response, leaving the rest of the data sitting in _received. +# This was ameliorated by clearing self._received after each block is +# complete. diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py index db5bf5f8..24531260 100644 --- a/src/allmydata/test/test_cli.py +++ b/src/allmydata/test/test_cli.py @@ -2303,8 +2303,8 @@ class Errors(GridTestMixin, CLITestMixin, unittest.TestCase): # the download is abandoned as soon as it's clear that we won't get # enough shares. The one remaining share might be in either the # COMPLETE or the PENDING state. - in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3" - in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3" + in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3" + in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3" d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) def _check1((rc, out, err)): diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index 71a556bb..40f0d62c 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -15,8 +15,9 @@ from allmydata.test.no_network import GridTestMixin from allmydata.test.common import ShouldFailMixin from allmydata.interfaces import NotEnoughSharesError, NoSharesError from allmydata.immutable.downloader.common import BadSegmentNumberError, \ - BadCiphertextHashError, DownloadStopped + BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD from allmydata.immutable.downloader.status import DownloadStatus +from allmydata.immutable.downloader.fetcher import SegmentFetcher from allmydata.codec import CRSDecoder from foolscap.eventual import fireEventually, flushEventualQueue @@ -295,7 +296,7 @@ class DownloadTest(_Base, unittest.TestCase): # shares servers = [] shares = sorted([s._shnum for s in self.n._cnode._node._shares]) - self.failUnlessEqual(shares, [0,1,2]) + self.failUnlessEqual(shares, [0,1,2,3]) # break the RIBucketReader references for s in self.n._cnode._node._shares: s._rref.broken = True @@ -318,7 +319,7 @@ class DownloadTest(_Base, unittest.TestCase): self.failUnlessEqual("".join(c.chunks), plaintext) shares = sorted([s._shnum for s in self.n._cnode._node._shares]) # we should now be using more shares than we were before - self.failIfEqual(shares, [0,1,2]) + self.failIfEqual(shares, [0,1,2,3]) d.addCallback(_check_failover) return d @@ -539,7 +540,7 @@ class DownloadTest(_Base, unittest.TestCase): def _con1_should_not_succeed(res): self.fail("the first read should not have succeeded") def _con1_failed(f): - self.failUnless(f.check(NotEnoughSharesError)) + self.failUnless(f.check(NoSharesError)) con2.producer.stopProducing() return d2 d.addCallbacks(_con1_should_not_succeed, _con1_failed) @@ -583,7 +584,7 @@ class DownloadTest(_Base, unittest.TestCase): def _con1_should_not_succeed(res): self.fail("the first read should not have succeeded") def _con1_failed(f): - self.failUnless(f.check(NotEnoughSharesError)) + self.failUnless(f.check(NoSharesError)) # we *don't* cancel the second one here: this exercises a # lost-progress bug from #1154. We just wait for it to # succeed. @@ -1121,7 +1122,7 @@ class Corruption(_Base, unittest.TestCase): # All these tests result in a failed download. d.addCallback(self._corrupt_flip_all, imm_uri, i) d.addCallback(lambda ign: - self.shouldFail(NotEnoughSharesError, which, + self.shouldFail(NoSharesError, which, substring, _download, imm_uri)) d.addCallback(lambda ign: self.restore_all_shares(self.shares)) @@ -1257,3 +1258,332 @@ class Status(unittest.TestCase): e2.update(1000, 2.0, 2.0) e2.finished(now+5) self.failUnlessEqual(ds.get_progress(), 1.0) + +class MyShare: + def __init__(self, shnum, peerid, rtt): + self._shnum = shnum + self._peerid = peerid + self._peerid_s = peerid + self._dyhb_rtt = rtt + def __repr__(self): + return "sh%d-on-%s" % (self._shnum, self._peerid) + +class MySegmentFetcher(SegmentFetcher): + def __init__(self, *args, **kwargs): + SegmentFetcher.__init__(self, *args, **kwargs) + self._test_start_shares = [] + def _start_share(self, share, shnum): + self._test_start_shares.append(share) + +class FakeNode: + def __init__(self): + self.want_more = 0 + self.failed = None + self.processed = None + self._si_prefix = "si_prefix" + def want_more_shares(self): + self.want_more += 1 + def fetch_failed(self, fetcher, f): + self.failed = f + def process_blocks(self, segnum, blocks): + self.processed = (segnum, blocks) + def get_num_segments(self): + return 1, True + +class Selection(unittest.TestCase): + def test_no_shares(self): + node = FakeNode() + sf = SegmentFetcher(node, 0, 3, None) + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(node.failed, None) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NoSharesError)) + d.addCallback(_check2) + return d + + def test_only_one_share(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(0, "peer-A", 0.0)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(node.failed, None) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NotEnoughSharesError)) + self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=", + str(node.failed)) + d.addCallback(_check2) + return d + + def test_good_diversity_early(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check2) + return d + + def test_good_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check3) + return d + + def test_avoid_bad_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we could satisfy the read entirely from the first server, but we'd + # prefer not to. Instead, we expect to only pull one share from the + # first server + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-B", 1.0), + MyShare(4, "peer-C", 2.0), + ] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[3], shares[4]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 3: "block-3", + 4: "block-4"}) ) + d.addCallback(_check3) + return d + + def test_suffer_bad_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we satisfy the read entirely from the first server because we don't + # have any other choice. + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-A", 0.0), + MyShare(4, "peer-A", 0.0), + ] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(node.want_more, 3) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[2]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check3) + return d + + def test_suffer_bad_diversity_early(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we satisfy the read entirely from the first server because we don't + # have any other choice. + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-A", 0.0), + MyShare(4, "peer-A", 0.0), + ] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 2) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[2]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check2) + return d + + def test_overdue(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, OVERDUE) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:6]) + for sh in sf._test_start_shares[3:]: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {3: "block-3", + 4: "block-4", + 5: "block-5"}) ) + d.addCallback(_check3) + return d + + def test_overdue_fails(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)] + sf.add_shares(shares) + sf.no_more_shares() + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, OVERDUE) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:6]) + for sh in sf._test_start_shares[3:]: + sf._block_request_activity(sh, sh._shnum, DEAD) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + # we're still waiting + self.failUnlessEqual(node.processed, None) + self.failUnlessEqual(node.failed, None) + # now complete one of the overdue ones, and kill one of the other + # ones, leaving one hanging. This should trigger a failure, since + # we cannot succeed. + live = sf._test_start_shares[0] + die = sf._test_start_shares[1] + sf._block_request_activity(live, live._shnum, COMPLETE, "block") + sf._block_request_activity(die, die._shnum, DEAD) + return flushEventualQueue() + d.addCallback(_check3) + def _check4(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NotEnoughSharesError)) + self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=", + str(node.failed)) + d.addCallback(_check4) + return d + + def test_avoid_redundancy(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we could satisfy the read entirely from the first server, but we'd + # prefer not to. Instead, we expect to only pull one share from the + # first server + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-B", 1.0), + MyShare(0, "peer-C", 2.0), # this will be skipped + MyShare(1, "peer-D", 3.0), + MyShare(2, "peer-E", 4.0), + ] + sf.add_shares(shares[:3]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1]]) + # allow sh1 to retire + sf._block_request_activity(shares[1], 1, COMPLETE, "block-1") + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + # and then feed in the remaining shares + sf.add_shares(shares[3:]) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[4]]) + sf._block_request_activity(shares[0], 0, COMPLETE, "block-0") + sf._block_request_activity(shares[4], 2, COMPLETE, "block-2") + return flushEventualQueue() + d.addCallback(_check3) + def _check4(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check4) + return d diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py index 288332d0..511a865b 100644 --- a/src/allmydata/test/test_immutable.py +++ b/src/allmydata/test/test_immutable.py @@ -52,7 +52,7 @@ class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase) def _after_download(unused=None): after_download_reads = self._count_reads() #print before_download_reads, after_download_reads - self.failIf(after_download_reads-before_download_reads > 36, + self.failIf(after_download_reads-before_download_reads > 41, (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 30080464..f68e98d8 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -4259,15 +4259,20 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi def _check_one_share(body): self.failIf("" in body, body) body = " ".join(body.strip().split()) - msg = ("NotEnoughSharesError: This indicates that some " - "servers were unavailable, or that shares have been " - "lost to server departure, hard drive failure, or disk " - "corruption. You should perform a filecheck on " - "this object to learn more. The full error message is:" - " ran out of shares: %d complete, %d pending, 0 overdue," - " 0 unused, need 3. Last failure: None") - msg1 = msg % (1, 0) - msg2 = msg % (0, 1) + msgbase = ("NotEnoughSharesError: This indicates that some " + "servers were unavailable, or that shares have been " + "lost to server departure, hard drive failure, or disk " + "corruption. You should perform a filecheck on " + "this object to learn more. The full error message is:" + ) + msg1 = msgbase + (" ran out of shares:" + " complete=sh0" + " pending=" + " overdue= unused= need 3. Last failure: None") + msg2 = msgbase + (" ran out of shares:" + " complete=" + " pending=Share(sh0-on-xgru5)" + " overdue= unused= need 3. Last failure: None") self.failUnless(body == msg1 or body == msg2, body) d.addCallback(_check_one_share) -- 2.45.2