deliver all shares at once instead of feeding them out one-at-a-time.
Also fix distribution of real-number-of-segments information: now all
CommonShares (not just the ones used for the first segment) get a
correctly-sized hashtree. Previously, the late ones might not, which would
make them crash and get dropped (causing the download to fail if the initial
set were insufficient, perhaps because one of their servers went away).
Update tests, add some TODO notes, improve variable names and comments.
Improve logging: add logparents, set more appropriate levels.
from allmydata.interfaces import NotEnoughSharesError, NoSharesError
from allmydata.util import log
from allmydata.util.dictutil import DictOfSets
-from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \
- BADSEGNUM, BadSegmentNumberError
+from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \
+ BadSegmentNumberError
class SegmentFetcher:
"""I am responsible for acquiring blocks for a single segment. I will use
will shut down and do no further work. My parent can also call my stop()
method to have me shut down early."""
- def __init__(self, node, segnum, k):
+ def __init__(self, node, segnum, k, logparent):
self._node = node # _Node
self.segnum = segnum
self._k = k
- self._shares = {} # maps non-dead Share instance to a state, one of
- # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT).
- # State transition map is:
- # AVAILABLE -(send-read)-> PENDING
- # PENDING -(timer)-> OVERDUE
- # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
- # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
- # If a share becomes DEAD, it is removed from the
- # dict. If it becomes BADSEGNUM, the whole fetch is
- # terminated.
+ self._shares = [] # unused Share instances, sorted by "goodness"
+ # (RTT), then shnum. This is populated when DYHB
+ # responses arrive, or (for later segments) at
+ # startup. We remove shares from it when we call
+ # sh.get_block() on them.
+ self._shares_from_server = DictOfSets() # maps serverid to set of
+ # Shares on that server for
+ # which we have outstanding
+ # get_block() calls.
+ self._max_shares_per_server = 1 # how many Shares we're allowed to
+ # pull from each server. This starts
+ # at 1 and grows if we don't have
+ # sufficient diversity.
+ self._active_share_map = {} # maps shnum to outstanding (and not
+ # OVERDUE) Share that provides it.
+ self._overdue_share_map = DictOfSets() # shares in the OVERDUE state
+ self._lp = logparent
self._share_observers = {} # maps Share to EventStreamObserver for
# active ones
- self._shnums = DictOfSets() # maps shnum to the shares that provide it
self._blocks = {} # maps shnum to validated block data
self._no_more_shares = False
- self._bad_segnum = False
self._last_failure = None
self._running = True
def stop(self):
log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix,
- level=log.NOISY, umid="LWyqpg")
+ level=log.NOISY, parent=self._lp, umid="LWyqpg")
self._cancel_all_requests()
self._running = False
- self._shares.clear() # let GC work # ??? XXX
+ # help GC ??? XXX
+ del self._shares, self._shares_from_server, self._active_share_map
+ del self._share_observers
# called by our parent _Node
# called when ShareFinder locates a new share, and when a non-initial
# segment fetch is started and we already know about shares from the
# previous segment
- for s in shares:
- self._shares[s] = AVAILABLE
- self._shnums.add(s._shnum, s)
+ self._shares.extend(shares)
+ self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) )
eventually(self.loop)
def no_more_shares(self):
# internal methods
- def _count_shnums(self, *states):
- """shnums for which at least one state is in the following list"""
- shnums = []
- for shnum,shares in self._shnums.iteritems():
- matches = [s for s in shares if self._shares.get(s) in states]
- if matches:
- shnums.append(shnum)
- return len(shnums)
-
def loop(self):
try:
# if any exception occurs here, kill the download
k = self._k
if not self._running:
return
- if self._bad_segnum:
+ numsegs, authoritative = self._node.get_num_segments()
+ if authoritative and self.segnum >= numsegs:
# oops, we were asking for a segment number beyond the end of the
# file. This is an error.
self.stop()
self._node.fetch_failed(self, f)
return
+ #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares
+ # Should we sent out more requests?
+ while len(set(self._blocks.keys())
+ | set(self._active_share_map.keys())
+ ) < k:
+ # we don't have data or active requests for enough shares. Are
+ # there any unused shares we can start using?
+ (sent_something, want_more_diversity) = self._find_and_use_share()
+ if sent_something:
+ # great. loop back around in case we need to send more.
+ continue
+ if want_more_diversity:
+ # we could have sent something if we'd been allowed to pull
+ # more shares per server. Increase the limit and try again.
+ self._max_shares_per_server += 1
+ log.msg("SegmentFetcher(%s) increasing diversity limit to %d"
+ % (self._node._si_prefix, self._max_shares_per_server),
+ level=log.NOISY, umid="xY2pBA")
+ # Also ask for more shares, in the hopes of achieving better
+ # diversity for the next segment.
+ self._ask_for_more_shares()
+ continue
+ # we need more shares than the ones in self._shares to make
+ # progress
+ self._ask_for_more_shares()
+ if self._no_more_shares:
+ # But there are no more shares to be had. If we're going to
+ # succeed, it will be with the shares we've already seen.
+ # Will they be enough?
+ if len(set(self._blocks.keys())
+ | set(self._active_share_map.keys())
+ | set(self._overdue_share_map.keys())
+ ) < k:
+ # nope. bail.
+ self._no_shares_error() # this calls self.stop()
+ return
+ # our outstanding or overdue requests may yet work.
+ # more shares may be coming. Wait until then.
+ return
+
# are we done?
- if self._count_shnums(COMPLETE) >= k:
+ if len(set(self._blocks.keys())) >= k:
# yay!
self.stop()
self._node.process_blocks(self.segnum, self._blocks)
return
- # we may have exhausted everything
- if (self._no_more_shares and
- self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k):
- # no more new shares are coming, and the remaining hopeful shares
- # aren't going to be enough. boo!
-
- log.msg("share states: %r" % (self._shares,),
- level=log.NOISY, umid="0ThykQ")
- if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0:
- format = ("no shares (need %(k)d)."
- " Last failure: %(last_failure)s")
- args = { "k": k,
- "last_failure": self._last_failure }
- error = NoSharesError
- else:
- format = ("ran out of shares: %(complete)d complete,"
- " %(pending)d pending, %(overdue)d overdue,"
- " %(unused)d unused, need %(k)d."
- " Last failure: %(last_failure)s")
- args = {"complete": self._count_shnums(COMPLETE),
- "pending": self._count_shnums(PENDING),
- "overdue": self._count_shnums(OVERDUE),
- # 'unused' should be zero
- "unused": self._count_shnums(AVAILABLE),
- "k": k,
- "last_failure": self._last_failure,
- }
- error = NotEnoughSharesError
- log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args)
- e = error(format % args)
- f = Failure(e)
- self.stop()
- self._node.fetch_failed(self, f)
- return
+ def _no_shares_error(self):
+ if not (self._shares or self._active_share_map or
+ self._overdue_share_map or self._blocks):
+ format = ("no shares (need %(k)d)."
+ " Last failure: %(last_failure)s")
+ args = { "k": self._k,
+ "last_failure": self._last_failure }
+ error = NoSharesError
+ else:
+ format = ("ran out of shares: complete=%(complete)s"
+ " pending=%(pending)s overdue=%(overdue)s"
+ " unused=%(unused)s need %(k)d."
+ " Last failure: %(last_failure)s")
+ def join(shnums): return ",".join(["sh%d" % shnum
+ for shnum in sorted(shnums)])
+ pending_s = ",".join([str(sh)
+ for sh in self._active_share_map.values()])
+ overdue = set()
+ for shares in self._overdue_share_map.values():
+ overdue |= shares
+ overdue_s = ",".join([str(sh) for sh in overdue])
+ args = {"complete": join(self._blocks.keys()),
+ "pending": pending_s,
+ "overdue": overdue_s,
+ # 'unused' should be zero
+ "unused": ",".join([str(sh) for sh in self._shares]),
+ "k": self._k,
+ "last_failure": self._last_failure,
+ }
+ error = NotEnoughSharesError
+ log.msg(format=format,
+ level=log.UNUSUAL, parent=self._lp, umid="1DsnTg",
+ **args)
+ e = error(format % args)
+ f = Failure(e)
+ self.stop()
+ self._node.fetch_failed(self, f)
- # nope, not done. Are we "block-hungry" (i.e. do we want to send out
- # more read requests, or do we think we have enough in flight
- # already?)
- while self._count_shnums(PENDING, COMPLETE) < k:
- # we're hungry.. are there any unused shares?
- sent = self._send_new_request()
- if not sent:
- break
-
- # ok, now are we "share-hungry" (i.e. do we have enough known shares
- # to make us happy, or should we ask the ShareFinder to get us more?)
- if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k:
- # we're hungry for more shares
- self._node.want_more_shares()
- # that will trigger the ShareFinder to keep looking
-
- def _find_one(self, shares, state):
- # TODO could choose fastest, or avoid servers already in use
- for s in shares:
- if self._shares[s] == state:
- return s
- # can never get here, caller has assert in case of code bug
-
- def _send_new_request(self):
- # TODO: this is probably O(k^2), and we're called from a range(k)
- # loop, so O(k^3)
-
- # this first loop prefers sh0, then sh1, sh2, etc
- for shnum,shares in sorted(self._shnums.iteritems()):
- states = [self._shares[s] for s in shares]
- if COMPLETE in states or PENDING in states:
- # don't send redundant requests
+ def _find_and_use_share(self):
+ sent_something = False
+ want_more_diversity = False
+ for sh in self._shares: # find one good share to fetch
+ shnum = sh._shnum ; serverid = sh._peerid
+ if shnum in self._blocks:
+ continue # don't request data we already have
+ if shnum in self._active_share_map:
+ # note: OVERDUE shares are removed from _active_share_map
+ # and added to _overdue_share_map instead.
+ continue # don't send redundant requests
+ sfs = self._shares_from_server
+ if len(sfs.get(serverid,set())) >= self._max_shares_per_server:
+ # don't pull too much from a single server
+ want_more_diversity = True
continue
- if AVAILABLE not in states:
- # no candidates for this shnum, move on
- continue
- # here's a candidate. Send a request.
- s = self._find_one(shares, AVAILABLE)
- assert s
- self._shares[s] = PENDING
- self._share_observers[s] = o = s.get_block(self.segnum)
- o.subscribe(self._block_request_activity, share=s, shnum=shnum)
- # TODO: build up a list of candidates, then walk through the
- # list, sending requests to the most desireable servers,
- # re-checking our block-hunger each time. For non-initial segment
- # fetches, this would let us stick with faster servers.
- return True
- # nothing was sent: don't call us again until you have more shares to
- # work with, or one of the existing shares has been declared OVERDUE
- return False
+ # ok, we can use this share
+ self._shares.remove(sh)
+ self._active_share_map[shnum] = sh
+ self._shares_from_server.add(serverid, sh)
+ self._start_share(sh, shnum)
+ sent_something = True
+ break
+ return (sent_something, want_more_diversity)
+
+ def _start_share(self, share, shnum):
+ self._share_observers[share] = o = share.get_block(self.segnum)
+ o.subscribe(self._block_request_activity, share=share, shnum=shnum)
+
+ def _ask_for_more_shares(self):
+ if not self._no_more_shares:
+ self._node.want_more_shares()
+ # that will trigger the ShareFinder to keep looking, and call our
+ # add_shares() or no_more_shares() later.
def _cancel_all_requests(self):
for o in self._share_observers.values():
log.msg("SegmentFetcher(%s)._block_request_activity:"
" Share(sh%d-on-%s) -> %s" %
(self._node._si_prefix, shnum, share._peerid_s, state),
- level=log.NOISY, umid="vilNWA")
- # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal.
+ level=log.NOISY, parent=self._lp, umid="vilNWA")
+ # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share
+ # from all our tracking lists.
if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
self._share_observers.pop(share, None)
+ self._shares_from_server.discard(shnum, share)
+ if self._active_share_map.get(shnum) is share:
+ del self._active_share_map[shnum]
+ self._overdue_share_map.discard(shnum, share)
+
if state is COMPLETE:
- # 'block' is fully validated
- self._shares[share] = COMPLETE
+ # 'block' is fully validated and complete
self._blocks[shnum] = block
- elif state is OVERDUE:
- self._shares[share] = OVERDUE
+
+ if state is OVERDUE:
+ # no longer active, but still might complete
+ del self._active_share_map[shnum]
+ self._overdue_share_map.add(shnum, share)
# OVERDUE is not terminal: it will eventually transition to
# COMPLETE, CORRUPT, or DEAD.
- elif state is CORRUPT:
- self._shares[share] = CORRUPT
- elif state is DEAD:
- del self._shares[share]
- self._shnums[shnum].remove(share)
- self._last_failure = f
- elif state is BADSEGNUM:
- self._shares[share] = BADSEGNUM # ???
- self._bad_segnum = True
- eventually(self.loop)
+ if state is DEAD:
+ self._last_failure = f
+ if state is BADSEGNUM:
+ # our main loop will ask the DownloadNode each time for the
+ # number of segments, so we'll deal with this in the top of
+ # _do_loop
+ pass
+ eventually(self.loop)
self._storage_broker = storage_broker
self.share_consumer = self.node = node
self.max_outstanding_requests = max_outstanding_requests
-
self._hungry = False
self._commonshares = {} # shnum to CommonShare instance
- self.undelivered_shares = []
self.pending_requests = set()
self.overdue_requests = set() # subset of pending_requests
self.overdue_timers = {}
si=self._si_prefix,
level=log.NOISY, parent=logparent, umid="2xjj2A")
+ def update_num_segments(self):
+ (numsegs, authoritative) = self.node.get_num_segments()
+ assert authoritative
+ for cs in self._commonshares.values():
+ cs.set_authoritative_num_segments(numsegs)
+
def start_finding_servers(self):
# don't get servers until somebody uses us: creating the
# ImmutableFileNode should not cause work to happen yet. Test case is
# internal methods
def loop(self):
- undelivered_s = ",".join(["sh%d@%s" %
- (s._shnum, idlib.shortnodeid_b2a(s._peerid))
- for s in self.undelivered_shares])
pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
for rt in self.pending_requests]) # sort?
self.log(format="ShareFinder loop: running=%(running)s"
- " hungry=%(hungry)s, undelivered=%(undelivered)s,"
- " pending=%(pending)s",
- running=self.running, hungry=self._hungry,
- undelivered=undelivered_s, pending=pending_s,
+ " hungry=%(hungry)s, pending=%(pending)s",
+ running=self.running, hungry=self._hungry, pending=pending_s,
level=log.NOISY, umid="kRtS4Q")
if not self.running:
return
if not self._hungry:
return
- if self.undelivered_shares:
- sh = self.undelivered_shares.pop(0)
- # they will call hungry() again if they want more
- self._hungry = False
- self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)",
- shnum=sh._shnum, peerid=sh._peerid_s,
- level=log.NOISY, umid="2n1qQw")
- eventually(self.share_consumer.got_shares, [sh])
- return
non_overdue = self.pending_requests - self.overdue_requests
if len(non_overdue) >= self.max_outstanding_requests:
lp = self.log(format="sending DYHB to [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, umid="Io7pyg")
- d_ev = self._download_status.add_dyhb_sent(peerid, now())
+ time_sent = now()
+ d_ev = self._download_status.add_dyhb_sent(peerid, time_sent)
# TODO: get the timer from a Server object, it knows best
self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
self.overdue, req)
d = rref.callRemote("get_buckets", self._storage_index)
d.addBoth(incidentally, self._request_retired, req)
d.addCallbacks(self._got_response, self._got_error,
- callbackArgs=(rref.version, peerid, req, d_ev, lp),
+ callbackArgs=(rref.version, peerid, req, d_ev,
+ time_sent, lp),
errbackArgs=(peerid, req, d_ev, lp))
d.addErrback(log.err, format="error in send_request",
level=log.WEIRD, parent=lp, umid="rpdV0w")
self.overdue_requests.add(req)
eventually(self.loop)
- def _got_response(self, buckets, server_version, peerid, req, d_ev, lp):
+ def _got_response(self, buckets, server_version, peerid, req, d_ev,
+ time_sent, lp):
shnums = sorted([shnum for shnum in buckets])
- d_ev.finished(shnums, now())
- if buckets:
- shnums_s = ",".join([str(shnum) for shnum in shnums])
- self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
- shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
- level=log.NOISY, parent=lp, umid="0fcEZw")
- else:
+ time_received = now()
+ d_ev.finished(shnums, time_received)
+ dyhb_rtt = time_received - time_sent
+ if not buckets:
self.log(format="no shares from [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, parent=lp, umid="U7d4JA")
- if self.node.num_segments is None:
- best_numsegs = self.node.guessed_num_segments
- else:
- best_numsegs = self.node.num_segments
+ return
+ shnums_s = ",".join([str(shnum) for shnum in shnums])
+ self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
+ shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
+ level=log.NOISY, parent=lp, umid="0fcEZw")
+ shares = []
for shnum, bucket in buckets.iteritems():
- self._create_share(best_numsegs, shnum, bucket, server_version,
- peerid)
+ s = self._create_share(shnum, bucket, server_version, peerid,
+ dyhb_rtt)
+ shares.append(s)
+ self._deliver_shares(shares)
- def _create_share(self, best_numsegs, shnum, bucket, server_version,
- peerid):
+ def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt):
if shnum in self._commonshares:
cs = self._commonshares[shnum]
else:
- cs = CommonShare(best_numsegs, self._si_prefix, shnum,
+ numsegs, authoritative = self.node.get_num_segments()
+ cs = CommonShare(numsegs, self._si_prefix, shnum,
self._node_logparent)
+ if authoritative:
+ cs.set_authoritative_num_segments(numsegs)
# Share._get_satisfaction is responsible for updating
# CommonShare.set_numsegs after we know the UEB. Alternatives:
# 1: d = self.node.get_num_segments()
# Yuck.
self._commonshares[shnum] = cs
s = Share(bucket, server_version, self.verifycap, cs, self.node,
- self._download_status, peerid, shnum,
+ self._download_status, peerid, shnum, dyhb_rtt,
self._node_logparent)
- self.undelivered_shares.append(s)
+ return s
+
+ def _deliver_shares(self, shares):
+ # they will call hungry() again if they want more
+ self._hungry = False
+ shares_s = ",".join([str(sh) for sh in shares])
+ self.log(format="delivering shares: %s" % shares_s,
+ level=log.NOISY, umid="2n1qQw")
+ eventually(self.share_consumer.got_shares, shares)
def _got_error(self, f, peerid, req, d_ev, lp):
d_ev.finished("error", now())
# things to track callers that want data
# _segment_requests can have duplicates
- self._segment_requests = [] # (segnum, d, cancel_handle)
+ self._segment_requests = [] # (segnum, d, cancel_handle, logparent)
self._active_segment = None # a SegmentFetcher, with .segnum
self._segsize_observers = observer.OneShotObserverList()
# for each read() call. Segmentation and get_segment() messages are
# associated with the read() call, everything else is tied to the
# _Node's log entry.
- lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d,"
+ lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
+ " size=%(size)d,"
" guessed_segsize=%(guessed_segsize)d,"
" guessed_numsegs=%(guessed_numsegs)d",
si=self._si_prefix, size=verifycap.size,
# as with CommonShare, our ciphertext_hash_tree is a stub until we
# get the real num_segments
self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
+ self.ciphertext_hash_tree_leaves = self.guessed_num_segments
def __repr__(self):
- return "Imm_Node(%s)" % (self._si_prefix,)
+ return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
def stop(self):
# called by the Terminator at shutdown, mostly for tests
The Deferred can also errback with other fatal problems, such as
NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
"""
- log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
- si=base32.b2a(self._verifycap.storage_index)[:8],
- segnum=segnum,
- level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
+ lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
+ si=base32.b2a(self._verifycap.storage_index)[:8],
+ segnum=segnum,
+ level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
self._download_status.add_segment_request(segnum, now())
d = defer.Deferred()
c = Cancel(self._cancel_request)
- self._segment_requests.append( (segnum, d, c) )
+ self._segment_requests.append( (segnum, d, c, lp) )
self._start_new_segment()
return (d, c)
if self._active_segment is None and self._segment_requests:
segnum = self._segment_requests[0][0]
k = self._verifycap.needed_shares
+ lp = self._segment_requests[0][3]
log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
node=repr(self), segnum=segnum,
- level=log.NOISY, umid="wAlnHQ")
- self._active_segment = fetcher = SegmentFetcher(self, segnum, k)
+ level=log.NOISY, parent=lp, umid="wAlnHQ")
+ self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
active_shares = [s for s in self._shares if s.is_alive()]
fetcher.add_shares(active_shares) # this triggers the loop
h = hashutil.uri_extension_hash(UEB_s)
if h != self._verifycap.uri_extension_hash:
raise BadHashError
- UEB_dict = uri.unpack_extension(UEB_s)
- self._parse_and_store_UEB(UEB_dict) # sets self._stuff
+ self._parse_and_store_UEB(UEB_s) # sets self._stuff
# TODO: a malformed (but authentic) UEB could throw an assertion in
# _parse_and_store_UEB, and we should abandon the download.
self.have_UEB = True
- def _parse_and_store_UEB(self, d):
+ # inform the ShareFinder about our correct number of segments. This
+ # will update the block-hash-trees in all existing CommonShare
+ # instances, and will populate new ones with the correct value.
+ self._sharefinder.update_num_segments()
+
+ def _parse_and_store_UEB(self, UEB_s):
# Note: the UEB contains needed_shares and total_shares. These are
# redundant and inferior (the filecap contains the authoritative
# values). However, because it is possible to encode the same file in
# therefore, we ignore d['total_shares'] and d['needed_shares'].
+ d = uri.unpack_extension(UEB_s)
+
log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
- ueb=repr(d), vcap=self._verifycap.to_string(),
+ ueb=repr(uri.unpack_extension_readable(UEB_s)),
+ vcap=self._verifycap.to_string(),
level=log.NOISY, parent=self._lp, umid="cVqZnA")
k, N = self._verifycap.needed_shares, self._verifycap.total_shares
# shares of file B. self.ciphertext_hash_tree was a guess before:
# this is where we create it for real.
self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
+ self.ciphertext_hash_tree_leaves = self.num_segments
self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
self.share_hash_tree.set_hashes({0: d['share_root_hash']})
% (hashnum, len(self.share_hash_tree)))
self.share_hash_tree.set_hashes(share_hashes)
+ def get_desired_ciphertext_hashes(self, segnum):
+ if segnum < self.ciphertext_hash_tree_leaves:
+ return self.ciphertext_hash_tree.needed_hashes(segnum,
+ include_leaf=True)
+ return []
def get_needed_ciphertext_hashes(self, segnum):
cht = self.ciphertext_hash_tree
return cht.needed_hashes(segnum, include_leaf=True)
+
def process_ciphertext_hashes(self, hashes):
assert self.num_segments is not None
# this may raise BadHashError or NotEnoughHashesError
def _extract_requests(self, segnum):
"""Remove matching requests and return their (d,c) tuples so that the
caller can retire them."""
- retire = [(d,c) for (segnum0, d, c) in self._segment_requests
+ retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests
if segnum0 == segnum]
self._segment_requests = [t for t in self._segment_requests
if t[0] != segnum]
def _cancel_request(self, c):
self._segment_requests = [t for t in self._segment_requests
if t[2] != c]
- segnums = [segnum for (segnum,d,c) in self._segment_requests]
+ segnums = [segnum for (segnum,d,c,lp) in self._segment_requests]
# self._active_segment might be None in rare circumstances, so make
# sure we tolerate it
if self._active_segment and self._active_segment.segnum not in segnums:
self._active_segment.stop()
self._active_segment = None
self._start_new_segment()
+
+ # called by ShareFinder to choose hashtree sizes in CommonShares, and by
+ # SegmentFetcher to tell if it is still fetching a valid segnum.
+ def get_num_segments(self):
+ # returns (best_num_segments, authoritative)
+ if self.num_segments is None:
+ return (self.guessed_num_segments, False)
+ return (self.num_segments, True)
# servers. A different backend would use a different class.
def __init__(self, rref, server_version, verifycap, commonshare, node,
- download_status, peerid, shnum, logparent):
+ download_status, peerid, shnum, dyhb_rtt, logparent):
self._rref = rref
self._server_version = server_version
self._node = node # holds share_hash_tree and UEB
self._storage_index = verifycap.storage_index
self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
self._shnum = shnum
+ self._dyhb_rtt = dyhb_rtt
# self._alive becomes False upon fatal corruption or server error
self._alive = True
self._lp = log.msg(format="%(share)s created", share=repr(self),
if not self._satisfy_UEB():
# can't check any hashes without the UEB
return False
+ # the call to _satisfy_UEB() will immediately set the
+ # authoritative num_segments in all our CommonShares. If we
+ # guessed wrong, we might stil be working on a bogus segnum
+ # (beyond the real range). We catch this and signal BADSEGNUM
+ # before invoking any further code that touches hashtrees.
self.actual_segment_size = self._node.segment_size # might be updated
assert self.actual_segment_size is not None
- # knowing the UEB means knowing num_segments. Despite the redundancy,
- # this is the best place to set this. CommonShare.set_numsegs will
- # ignore duplicate calls.
+ # knowing the UEB means knowing num_segments
assert self._node.num_segments is not None
- cs = self._commonshare
- cs.set_numsegs(self._node.num_segments)
segnum, observers = self._active_segnum_and_observers()
# if segnum is None, we don't really need to do anything (we have no
# can't check block_hash_tree without a root
return False
- if cs.need_block_hash_root():
+ if self._commonshare.need_block_hash_root():
block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
- cs.set_block_hash_root(block_hash_root)
+ self._commonshare.set_block_hash_root(block_hash_root)
if segnum is None:
return False # we don't want any particular segment right now
] ):
offsets[field] = fields[i]
self.actual_offsets = offsets
- log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields))
+ log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields),
+ level=log.NOISY, parent=self._lp, umid="jedQcw")
self._received.remove(0, 4) # don't need this anymore
# validate the offsets a bit
block = self._received.pop(blockstart, blocklen)
if not block:
log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
- blockstart, blocklen))
+ blockstart, blocklen),
+ level=log.NOISY, parent=self._lp, umid="aK0RFw")
return False
log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
share=repr(self), start=blockstart, length=blocklen,
if self.actual_offsets or self._overrun_ok:
if not self._node.have_UEB:
self._desire_UEB(desire, o)
- # They might ask for a segment that doesn't look right.
- # _satisfy() will catch+reject bad segnums once we know the UEB
- # (and therefore segsize and numsegs), so we'll only fail this
- # test if we're still guessing. We want to avoid asking the
- # hashtrees for needed_hashes() for bad segnums. So don't enter
- # _desire_hashes or _desire_data unless the segnum looks
- # reasonable.
- if segnum < r["num_segments"]:
- # XXX somehow we're getting here for sh5. we don't yet know
- # the actual_segment_size, we're still working off the guess.
- # the ciphertext_hash_tree has been corrected, but the
- # commonshare._block_hash_tree is still in the guessed state.
- self._desire_share_hashes(desire, o)
- if segnum is not None:
- self._desire_block_hashes(desire, o, segnum)
- self._desire_data(desire, o, r, segnum, segsize)
- else:
- log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)"
- % (segnum, r["num_segments"]),
- level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
+ self._desire_share_hashes(desire, o)
+ if segnum is not None:
+ # They might be asking for a segment number that is beyond
+ # what we guess the file contains, but _desire_block_hashes
+ # and _desire_data will tolerate that.
+ self._desire_block_hashes(desire, o, segnum)
+ self._desire_data(desire, o, r, segnum, segsize)
log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
- % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()))
+ % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()),
+ level=log.NOISY, parent=self._lp, umid="IG7CgA")
if self.actual_offsets:
return (want_it, need_it+gotta_gotta_have_it)
else:
(want_it, need_it, gotta_gotta_have_it) = desire
# block hash chain
- for hashnum in self._commonshare.get_needed_block_hashes(segnum):
+ for hashnum in self._commonshare.get_desired_block_hashes(segnum):
need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
# ciphertext hash chain
- for hashnum in self._node.get_needed_ciphertext_hashes(segnum):
+ for hashnum in self._node.get_desired_ciphertext_hashes(segnum):
need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
def _desire_data(self, desire, o, r, segnum, segsize):
+ if segnum > r["num_segments"]:
+ # they're asking for a segment that's beyond what we think is the
+ # end of the file. We won't get here if we've already learned the
+ # real UEB: _get_satisfaction() will notice the out-of-bounds and
+ # terminate the loop. So we must still be guessing, which means
+ # that they might be correct in asking for such a large segnum.
+ # But if they're right, then our segsize/segnum guess is
+ # certainly wrong, which means we don't know what data blocks to
+ # ask for yet. So don't bother adding anything. When the UEB
+ # comes back and we learn the correct segsize/segnums, we'll
+ # either reject the request or have enough information to proceed
+ # normally. This costs one roundtrip.
+ log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)"
+ % (segnum, r["num_segments"]),
+ level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
+ return
(want_it, need_it, gotta_gotta_have_it) = desire
tail = (segnum == r["num_segments"]-1)
datastart = o["data"]
class CommonShare:
+ # TODO: defer creation of the hashtree until somebody uses us. There will
+ # be a lot of unused shares, and we shouldn't spend the memory on a large
+ # hashtree unless necessary.
"""I hold data that is common across all instances of a single share,
like sh2 on both servers A and B. This is just the block hash tree.
"""
- def __init__(self, guessed_numsegs, si_prefix, shnum, logparent):
+ def __init__(self, best_numsegs, si_prefix, shnum, logparent):
self.si_prefix = si_prefix
self.shnum = shnum
+
# in the beginning, before we have the real UEB, we can only guess at
# the number of segments. But we want to ask for block hashes early.
# So if we're asked for which block hashes are needed before we know
# numsegs for sure, we return a guess.
- self._block_hash_tree = IncompleteHashTree(guessed_numsegs)
- self._know_numsegs = False
+ self._block_hash_tree = IncompleteHashTree(best_numsegs)
+ self._block_hash_tree_is_authoritative = False
+ self._block_hash_tree_leaves = best_numsegs
self._logparent = logparent
- def set_numsegs(self, numsegs):
- if self._know_numsegs:
- return
- self._block_hash_tree = IncompleteHashTree(numsegs)
- self._know_numsegs = True
+ def __repr__(self):
+ return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum)
+
+ def set_authoritative_num_segments(self, numsegs):
+ if self._block_hash_tree_leaves != numsegs:
+ self._block_hash_tree = IncompleteHashTree(numsegs)
+ self._block_hash_tree_leaves = numsegs
+ self._block_hash_tree_is_authoritative = True
def need_block_hash_root(self):
return bool(not self._block_hash_tree[0])
def set_block_hash_root(self, roothash):
- assert self._know_numsegs
+ assert self._block_hash_tree_is_authoritative
self._block_hash_tree.set_hashes({0: roothash})
+ def get_desired_block_hashes(self, segnum):
+ if segnum < self._block_hash_tree_leaves:
+ return self._block_hash_tree.needed_hashes(segnum,
+ include_leaf=True)
+
+ # the segnum might be out-of-bounds. Originally it was due to a race
+ # between the receipt of the UEB on one share (from which we learn
+ # the correct number of segments, update all hash trees to the right
+ # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery
+ # of a new Share to the SegmentFetcher while that BADSEGNUM was
+ # queued (which sends out requests to the stale segnum, now larger
+ # than the hash tree). I fixed that (by making SegmentFetcher.loop
+ # check for a bad segnum at the start of each pass, instead of using
+ # the queued BADSEGNUM or a flag it sets), but just in case this
+ # still happens, I'm leaving the < in place. If it gets hit, there's
+ # a potential lost-progress problem, but I'm pretty sure that it will
+ # get cleared up on the following turn.
+ return []
+
def get_needed_block_hashes(self, segnum):
+ assert self._block_hash_tree_is_authoritative
# XXX: include_leaf=True needs thought: how did the old downloader do
# it? I think it grabbed *all* block hashes and set them all at once.
# Since we want to fetch less data, we either need to fetch the leaf
return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
def process_block_hashes(self, block_hashes):
- assert self._know_numsegs
+ assert self._block_hash_tree_is_authoritative
# this may raise BadHashError or NotEnoughHashesError
self._block_hash_tree.set_hashes(block_hashes)
def check_block(self, segnum, block):
- assert self._know_numsegs
+ assert self._block_hash_tree_is_authoritative
h = hashutil.block_hash(block)
# this may raise BadHashError or NotEnoughHashesError
self._block_hash_tree.set_hashes(leaves={segnum: h})
+
+# TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an
+# auxilliary OVERDUE callback. Just make sure to get all the messages in the
+# right order and on the right turns.
+
+# TODO: we're asking for too much data. We probably don't need
+# include_leaf=True in the block hash tree or ciphertext hash tree.
+
+# TODO: we ask for ciphertext hash tree nodes from all shares (whenever
+# _desire is called while we're missing those nodes), but we only consume it
+# from the first response, leaving the rest of the data sitting in _received.
+# This was ameliorated by clearing self._received after each block is
+# complete.
# the download is abandoned as soon as it's clear that we won't get
# enough shares. The one remaining share might be in either the
# COMPLETE or the PENDING state.
- in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3"
- in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3"
+ in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3"
+ in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3"
d.addCallback(lambda ign: self.do_cli("get", self.uri_1share))
def _check1((rc, out, err)):
from allmydata.test.common import ShouldFailMixin
from allmydata.interfaces import NotEnoughSharesError, NoSharesError
from allmydata.immutable.downloader.common import BadSegmentNumberError, \
- BadCiphertextHashError, DownloadStopped
+ BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD
from allmydata.immutable.downloader.status import DownloadStatus
+from allmydata.immutable.downloader.fetcher import SegmentFetcher
from allmydata.codec import CRSDecoder
from foolscap.eventual import fireEventually, flushEventualQueue
# shares
servers = []
shares = sorted([s._shnum for s in self.n._cnode._node._shares])
- self.failUnlessEqual(shares, [0,1,2])
+ self.failUnlessEqual(shares, [0,1,2,3])
# break the RIBucketReader references
for s in self.n._cnode._node._shares:
s._rref.broken = True
self.failUnlessEqual("".join(c.chunks), plaintext)
shares = sorted([s._shnum for s in self.n._cnode._node._shares])
# we should now be using more shares than we were before
- self.failIfEqual(shares, [0,1,2])
+ self.failIfEqual(shares, [0,1,2,3])
d.addCallback(_check_failover)
return d
def _con1_should_not_succeed(res):
self.fail("the first read should not have succeeded")
def _con1_failed(f):
- self.failUnless(f.check(NotEnoughSharesError))
+ self.failUnless(f.check(NoSharesError))
con2.producer.stopProducing()
return d2
d.addCallbacks(_con1_should_not_succeed, _con1_failed)
def _con1_should_not_succeed(res):
self.fail("the first read should not have succeeded")
def _con1_failed(f):
- self.failUnless(f.check(NotEnoughSharesError))
+ self.failUnless(f.check(NoSharesError))
# we *don't* cancel the second one here: this exercises a
# lost-progress bug from #1154. We just wait for it to
# succeed.
# All these tests result in a failed download.
d.addCallback(self._corrupt_flip_all, imm_uri, i)
d.addCallback(lambda ign:
- self.shouldFail(NotEnoughSharesError, which,
+ self.shouldFail(NoSharesError, which,
substring,
_download, imm_uri))
d.addCallback(lambda ign: self.restore_all_shares(self.shares))
e2.update(1000, 2.0, 2.0)
e2.finished(now+5)
self.failUnlessEqual(ds.get_progress(), 1.0)
+
+class MyShare:
+ def __init__(self, shnum, peerid, rtt):
+ self._shnum = shnum
+ self._peerid = peerid
+ self._peerid_s = peerid
+ self._dyhb_rtt = rtt
+ def __repr__(self):
+ return "sh%d-on-%s" % (self._shnum, self._peerid)
+
+class MySegmentFetcher(SegmentFetcher):
+ def __init__(self, *args, **kwargs):
+ SegmentFetcher.__init__(self, *args, **kwargs)
+ self._test_start_shares = []
+ def _start_share(self, share, shnum):
+ self._test_start_shares.append(share)
+
+class FakeNode:
+ def __init__(self):
+ self.want_more = 0
+ self.failed = None
+ self.processed = None
+ self._si_prefix = "si_prefix"
+ def want_more_shares(self):
+ self.want_more += 1
+ def fetch_failed(self, fetcher, f):
+ self.failed = f
+ def process_blocks(self, segnum, blocks):
+ self.processed = (segnum, blocks)
+ def get_num_segments(self):
+ return 1, True
+
+class Selection(unittest.TestCase):
+ def test_no_shares(self):
+ node = FakeNode()
+ sf = SegmentFetcher(node, 0, 3, None)
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ self.failUnlessEqual(node.failed, None)
+ sf.no_more_shares()
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnless(node.failed)
+ self.failUnless(node.failed.check(NoSharesError))
+ d.addCallback(_check2)
+ return d
+
+ def test_only_one_share(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(0, "peer-A", 0.0)]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ self.failUnlessEqual(node.failed, None)
+ sf.no_more_shares()
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnless(node.failed)
+ self.failUnless(node.failed.check(NotEnoughSharesError))
+ self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
+ str(node.failed))
+ d.addCallback(_check2)
+ return d
+
+ def test_good_diversity_early(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 0)
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check2)
+ return d
+
+ def test_good_diversity_late(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ sf.add_shares(shares)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_avoid_bad_diversity_late(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we could satisfy the read entirely from the first server, but we'd
+ # prefer not to. Instead, we expect to only pull one share from the
+ # first server
+ shares = [MyShare(0, "peer-A", 0.0),
+ MyShare(1, "peer-A", 0.0),
+ MyShare(2, "peer-A", 0.0),
+ MyShare(3, "peer-B", 1.0),
+ MyShare(4, "peer-C", 2.0),
+ ]
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ sf.add_shares(shares)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[3], shares[4]])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 3: "block-3",
+ 4: "block-4"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_suffer_bad_diversity_late(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we satisfy the read entirely from the first server because we don't
+ # have any other choice.
+ shares = [MyShare(0, "peer-A", 0.0),
+ MyShare(1, "peer-A", 0.0),
+ MyShare(2, "peer-A", 0.0),
+ MyShare(3, "peer-A", 0.0),
+ MyShare(4, "peer-A", 0.0),
+ ]
+ sf.add_shares([])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ sf.add_shares(shares)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(node.want_more, 3)
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1], shares[2]])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_suffer_bad_diversity_early(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we satisfy the read entirely from the first server because we don't
+ # have any other choice.
+ shares = [MyShare(0, "peer-A", 0.0),
+ MyShare(1, "peer-A", 0.0),
+ MyShare(2, "peer-A", 0.0),
+ MyShare(3, "peer-A", 0.0),
+ MyShare(4, "peer-A", 0.0),
+ ]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 2)
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1], shares[2]])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check2)
+ return d
+
+ def test_overdue(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+ sf.add_shares(shares)
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 0)
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, OVERDUE)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares, shares[:6])
+ for sh in sf._test_start_shares[3:]:
+ sf._block_request_activity(sh, sh._shnum, COMPLETE,
+ "block-%d" % sh._shnum)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {3: "block-3",
+ 4: "block-4",
+ 5: "block-5"}) )
+ d.addCallback(_check3)
+ return d
+
+ def test_overdue_fails(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
+ sf.add_shares(shares)
+ sf.no_more_shares()
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 0)
+ self.failUnlessEqual(sf._test_start_shares, shares[:3])
+ for sh in sf._test_start_shares:
+ sf._block_request_activity(sh, sh._shnum, OVERDUE)
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ self.failUnlessEqual(sf._test_start_shares, shares[:6])
+ for sh in sf._test_start_shares[3:]:
+ sf._block_request_activity(sh, sh._shnum, DEAD)
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ # we're still waiting
+ self.failUnlessEqual(node.processed, None)
+ self.failUnlessEqual(node.failed, None)
+ # now complete one of the overdue ones, and kill one of the other
+ # ones, leaving one hanging. This should trigger a failure, since
+ # we cannot succeed.
+ live = sf._test_start_shares[0]
+ die = sf._test_start_shares[1]
+ sf._block_request_activity(live, live._shnum, COMPLETE, "block")
+ sf._block_request_activity(die, die._shnum, DEAD)
+ return flushEventualQueue()
+ d.addCallback(_check3)
+ def _check4(ign):
+ self.failUnless(node.failed)
+ self.failUnless(node.failed.check(NotEnoughSharesError))
+ self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
+ str(node.failed))
+ d.addCallback(_check4)
+ return d
+
+ def test_avoid_redundancy(self):
+ node = FakeNode()
+ sf = MySegmentFetcher(node, 0, 3, None)
+ # we could satisfy the read entirely from the first server, but we'd
+ # prefer not to. Instead, we expect to only pull one share from the
+ # first server
+ shares = [MyShare(0, "peer-A", 0.0),
+ MyShare(1, "peer-B", 1.0),
+ MyShare(0, "peer-C", 2.0), # this will be skipped
+ MyShare(1, "peer-D", 3.0),
+ MyShare(2, "peer-E", 4.0),
+ ]
+ sf.add_shares(shares[:3])
+ d = flushEventualQueue()
+ def _check1(ign):
+ self.failUnlessEqual(node.want_more, 1)
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1]])
+ # allow sh1 to retire
+ sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
+ return flushEventualQueue()
+ d.addCallback(_check1)
+ def _check2(ign):
+ # and then feed in the remaining shares
+ sf.add_shares(shares[3:])
+ sf.no_more_shares()
+ return flushEventualQueue()
+ d.addCallback(_check2)
+ def _check3(ign):
+ self.failUnlessEqual(sf._test_start_shares,
+ [shares[0], shares[1], shares[4]])
+ sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
+ sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
+ return flushEventualQueue()
+ d.addCallback(_check3)
+ def _check4(ign):
+ self.failIfEqual(node.processed, None)
+ self.failUnlessEqual(node.processed, (0, {0: "block-0",
+ 1: "block-1",
+ 2: "block-2"}) )
+ d.addCallback(_check4)
+ return d
def _after_download(unused=None):
after_download_reads = self._count_reads()
#print before_download_reads, after_download_reads
- self.failIf(after_download_reads-before_download_reads > 36,
+ self.failIf(after_download_reads-before_download_reads > 41,
(after_download_reads, before_download_reads))
d.addCallback(self._download_and_check_plaintext)
d.addCallback(_after_download)
def _check_one_share(body):
self.failIf("<html>" in body, body)
body = " ".join(body.strip().split())
- msg = ("NotEnoughSharesError: This indicates that some "
- "servers were unavailable, or that shares have been "
- "lost to server departure, hard drive failure, or disk "
- "corruption. You should perform a filecheck on "
- "this object to learn more. The full error message is:"
- " ran out of shares: %d complete, %d pending, 0 overdue,"
- " 0 unused, need 3. Last failure: None")
- msg1 = msg % (1, 0)
- msg2 = msg % (0, 1)
+ msgbase = ("NotEnoughSharesError: This indicates that some "
+ "servers were unavailable, or that shares have been "
+ "lost to server departure, hard drive failure, or disk "
+ "corruption. You should perform a filecheck on "
+ "this object to learn more. The full error message is:"
+ )
+ msg1 = msgbase + (" ran out of shares:"
+ " complete=sh0"
+ " pending="
+ " overdue= unused= need 3. Last failure: None")
+ msg2 = msgbase + (" ran out of shares:"
+ " complete="
+ " pending=Share(sh0-on-xgru5)"
+ " overdue= unused= need 3. Last failure: None")
self.failUnless(body == msg1 or body == msg2, body)
d.addCallback(_check_one_share)