]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
SegmentFetcher: use new diversity-seeking share-selection algorithm, and
authorBrian Warner <warner@lothar.com>
Wed, 1 Sep 2010 01:37:02 +0000 (18:37 -0700)
committerBrian Warner <warner@lothar.com>
Wed, 1 Sep 2010 01:37:02 +0000 (18:37 -0700)
deliver all shares at once instead of feeding them out one-at-a-time.

Also fix distribution of real-number-of-segments information: now all
CommonShares (not just the ones used for the first segment) get a
correctly-sized hashtree. Previously, the late ones might not, which would
make them crash and get dropped (causing the download to fail if the initial
set were insufficient, perhaps because one of their servers went away).

Update tests, add some TODO notes, improve variable names and comments.
Improve logging: add logparents, set more appropriate levels.

src/allmydata/immutable/downloader/fetcher.py
src/allmydata/immutable/downloader/finder.py
src/allmydata/immutable/downloader/node.py
src/allmydata/immutable/downloader/share.py
src/allmydata/test/test_cli.py
src/allmydata/test/test_download.py
src/allmydata/test/test_immutable.py
src/allmydata/test/test_web.py

index e30ced851dd38f0d212e721677d5fa468865b1a2..e78d37e76b74721a8235493d71130809a75e3452 100644 (file)
@@ -4,8 +4,8 @@ from foolscap.api import eventually
 from allmydata.interfaces import NotEnoughSharesError, NoSharesError
 from allmydata.util import log
 from allmydata.util.dictutil import DictOfSets
-from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \
-     BADSEGNUM, BadSegmentNumberError
+from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \
+     BadSegmentNumberError
 
 class SegmentFetcher:
     """I am responsible for acquiring blocks for a single segment. I will use
@@ -22,35 +22,42 @@ class SegmentFetcher:
     will shut down and do no further work. My parent can also call my stop()
     method to have me shut down early."""
 
-    def __init__(self, node, segnum, k):
+    def __init__(self, node, segnum, k, logparent):
         self._node = node # _Node
         self.segnum = segnum
         self._k = k
-        self._shares = {} # maps non-dead Share instance to a state, one of
-                          # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT).
-                          # State transition map is:
-                          #  AVAILABLE -(send-read)-> PENDING
-                          #  PENDING -(timer)-> OVERDUE
-                          #  PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
-                          #  OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
-                          # If a share becomes DEAD, it is removed from the
-                          # dict. If it becomes BADSEGNUM, the whole fetch is
-                          # terminated.
+        self._shares = [] # unused Share instances, sorted by "goodness"
+                          # (RTT), then shnum. This is populated when DYHB
+                          # responses arrive, or (for later segments) at
+                          # startup. We remove shares from it when we call
+                          # sh.get_block() on them.
+        self._shares_from_server = DictOfSets() # maps serverid to set of
+                                                # Shares on that server for
+                                                # which we have outstanding
+                                                # get_block() calls.
+        self._max_shares_per_server = 1 # how many Shares we're allowed to
+                                        # pull from each server. This starts
+                                        # at 1 and grows if we don't have
+                                        # sufficient diversity.
+        self._active_share_map = {} # maps shnum to outstanding (and not
+                                    # OVERDUE) Share that provides it.
+        self._overdue_share_map = DictOfSets() # shares in the OVERDUE state
+        self._lp = logparent
         self._share_observers = {} # maps Share to EventStreamObserver for
                                    # active ones
-        self._shnums = DictOfSets() # maps shnum to the shares that provide it
         self._blocks = {} # maps shnum to validated block data
         self._no_more_shares = False
-        self._bad_segnum = False
         self._last_failure = None
         self._running = True
 
     def stop(self):
         log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix,
-                level=log.NOISY, umid="LWyqpg")
+                level=log.NOISY, parent=self._lp, umid="LWyqpg")
         self._cancel_all_requests()
         self._running = False
-        self._shares.clear() # let GC work # ??? XXX
+        # help GC ??? XXX
+        del self._shares, self._shares_from_server, self._active_share_map
+        del self._share_observers
 
 
     # called by our parent _Node
@@ -59,9 +66,8 @@ class SegmentFetcher:
         # called when ShareFinder locates a new share, and when a non-initial
         # segment fetch is started and we already know about shares from the
         # previous segment
-        for s in shares:
-            self._shares[s] = AVAILABLE
-            self._shnums.add(s._shnum, s)
+        self._shares.extend(shares)
+        self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) )
         eventually(self.loop)
 
     def no_more_shares(self):
@@ -71,15 +77,6 @@ class SegmentFetcher:
 
     # internal methods
 
-    def _count_shnums(self, *states):
-        """shnums for which at least one state is in the following list"""
-        shnums = []
-        for shnum,shares in self._shnums.iteritems():
-            matches = [s for s in shares if self._shares.get(s) in states]
-            if matches:
-                shnums.append(shnum)
-        return len(shnums)
-
     def loop(self):
         try:
             # if any exception occurs here, kill the download
@@ -92,7 +89,8 @@ class SegmentFetcher:
         k = self._k
         if not self._running:
             return
-        if self._bad_segnum:
+        numsegs, authoritative = self._node.get_num_segments()
+        if authoritative and self.segnum >= numsegs:
             # oops, we were asking for a segment number beyond the end of the
             # file. This is an error.
             self.stop()
@@ -102,98 +100,125 @@ class SegmentFetcher:
             self._node.fetch_failed(self, f)
             return
 
+        #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares
+        # Should we sent out more requests?
+        while len(set(self._blocks.keys())
+                  | set(self._active_share_map.keys())
+                  ) < k:
+            # we don't have data or active requests for enough shares. Are
+            # there any unused shares we can start using?
+            (sent_something, want_more_diversity) = self._find_and_use_share()
+            if sent_something:
+                # great. loop back around in case we need to send more.
+                continue
+            if want_more_diversity:
+                # we could have sent something if we'd been allowed to pull
+                # more shares per server. Increase the limit and try again.
+                self._max_shares_per_server += 1
+                log.msg("SegmentFetcher(%s) increasing diversity limit to %d"
+                        % (self._node._si_prefix, self._max_shares_per_server),
+                        level=log.NOISY, umid="xY2pBA")
+                # Also ask for more shares, in the hopes of achieving better
+                # diversity for the next segment.
+                self._ask_for_more_shares()
+                continue
+            # we need more shares than the ones in self._shares to make
+            # progress
+            self._ask_for_more_shares()
+            if self._no_more_shares:
+                # But there are no more shares to be had. If we're going to
+                # succeed, it will be with the shares we've already seen.
+                # Will they be enough?
+                if len(set(self._blocks.keys())
+                       | set(self._active_share_map.keys())
+                       | set(self._overdue_share_map.keys())
+                       ) < k:
+                    # nope. bail.
+                    self._no_shares_error() # this calls self.stop()
+                    return
+                # our outstanding or overdue requests may yet work.
+            # more shares may be coming. Wait until then.
+            return
+
         # are we done?
-        if self._count_shnums(COMPLETE) >= k:
+        if len(set(self._blocks.keys())) >= k:
             # yay!
             self.stop()
             self._node.process_blocks(self.segnum, self._blocks)
             return
 
-        # we may have exhausted everything
-        if (self._no_more_shares and
-            self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k):
-            # no more new shares are coming, and the remaining hopeful shares
-            # aren't going to be enough. boo!
-
-            log.msg("share states: %r" % (self._shares,),
-                    level=log.NOISY, umid="0ThykQ")
-            if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0:
-                format = ("no shares (need %(k)d)."
-                          " Last failure: %(last_failure)s")
-                args = { "k": k,
-                         "last_failure": self._last_failure }
-                error = NoSharesError
-            else:
-                format = ("ran out of shares: %(complete)d complete,"
-                          " %(pending)d pending, %(overdue)d overdue,"
-                          " %(unused)d unused, need %(k)d."
-                          " Last failure: %(last_failure)s")
-                args = {"complete": self._count_shnums(COMPLETE),
-                        "pending": self._count_shnums(PENDING),
-                        "overdue": self._count_shnums(OVERDUE),
-                        # 'unused' should be zero
-                        "unused": self._count_shnums(AVAILABLE),
-                        "k": k,
-                        "last_failure": self._last_failure,
-                        }
-                error = NotEnoughSharesError
-            log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args)
-            e = error(format % args)
-            f = Failure(e)
-            self.stop()
-            self._node.fetch_failed(self, f)
-            return
+    def _no_shares_error(self):
+        if not (self._shares or self._active_share_map or
+                self._overdue_share_map or self._blocks):
+            format = ("no shares (need %(k)d)."
+                      " Last failure: %(last_failure)s")
+            args = { "k": self._k,
+                     "last_failure": self._last_failure }
+            error = NoSharesError
+        else:
+            format = ("ran out of shares: complete=%(complete)s"
+                      " pending=%(pending)s overdue=%(overdue)s"
+                      " unused=%(unused)s need %(k)d."
+                      " Last failure: %(last_failure)s")
+            def join(shnums): return ",".join(["sh%d" % shnum
+                                               for shnum in sorted(shnums)])
+            pending_s = ",".join([str(sh)
+                                  for sh in self._active_share_map.values()])
+            overdue = set()
+            for shares in self._overdue_share_map.values():
+                overdue |= shares
+            overdue_s = ",".join([str(sh) for sh in overdue])
+            args = {"complete": join(self._blocks.keys()),
+                    "pending": pending_s,
+                    "overdue": overdue_s,
+                    # 'unused' should be zero
+                    "unused": ",".join([str(sh) for sh in self._shares]),
+                    "k": self._k,
+                    "last_failure": self._last_failure,
+                    }
+            error = NotEnoughSharesError
+        log.msg(format=format,
+                level=log.UNUSUAL, parent=self._lp, umid="1DsnTg",
+                **args)
+        e = error(format % args)
+        f = Failure(e)
+        self.stop()
+        self._node.fetch_failed(self, f)
 
-        # nope, not done. Are we "block-hungry" (i.e. do we want to send out
-        # more read requests, or do we think we have enough in flight
-        # already?)
-        while self._count_shnums(PENDING, COMPLETE) < k:
-            # we're hungry.. are there any unused shares?
-            sent = self._send_new_request()
-            if not sent:
-                break
-
-        # ok, now are we "share-hungry" (i.e. do we have enough known shares
-        # to make us happy, or should we ask the ShareFinder to get us more?)
-        if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k:
-            # we're hungry for more shares
-            self._node.want_more_shares()
-            # that will trigger the ShareFinder to keep looking
-
-    def _find_one(self, shares, state):
-        # TODO could choose fastest, or avoid servers already in use
-        for s in shares:
-            if self._shares[s] == state:
-                return s
-        # can never get here, caller has assert in case of code bug
-
-    def _send_new_request(self):
-        # TODO: this is probably O(k^2), and we're called from a range(k)
-        # loop, so O(k^3)
-
-        # this first loop prefers sh0, then sh1, sh2, etc
-        for shnum,shares in sorted(self._shnums.iteritems()):
-            states = [self._shares[s] for s in shares]
-            if COMPLETE in states or PENDING in states:
-                # don't send redundant requests
+    def _find_and_use_share(self):
+        sent_something = False
+        want_more_diversity = False
+        for sh in self._shares: # find one good share to fetch
+            shnum = sh._shnum ; serverid = sh._peerid
+            if shnum in self._blocks:
+                continue # don't request data we already have
+            if shnum in self._active_share_map:
+                # note: OVERDUE shares are removed from _active_share_map
+                # and added to _overdue_share_map instead.
+                continue # don't send redundant requests
+            sfs = self._shares_from_server
+            if len(sfs.get(serverid,set())) >= self._max_shares_per_server:
+                # don't pull too much from a single server
+                want_more_diversity = True
                 continue
-            if AVAILABLE not in states:
-                # no candidates for this shnum, move on
-                continue
-            # here's a candidate. Send a request.
-            s = self._find_one(shares, AVAILABLE)
-            assert s
-            self._shares[s] = PENDING
-            self._share_observers[s] = o = s.get_block(self.segnum)
-            o.subscribe(self._block_request_activity, share=s, shnum=shnum)
-            # TODO: build up a list of candidates, then walk through the
-            # list, sending requests to the most desireable servers,
-            # re-checking our block-hunger each time. For non-initial segment
-            # fetches, this would let us stick with faster servers.
-            return True
-        # nothing was sent: don't call us again until you have more shares to
-        # work with, or one of the existing shares has been declared OVERDUE
-        return False
+            # ok, we can use this share
+            self._shares.remove(sh)
+            self._active_share_map[shnum] = sh
+            self._shares_from_server.add(serverid, sh)
+            self._start_share(sh, shnum)
+            sent_something = True
+            break
+        return (sent_something, want_more_diversity)
+
+    def _start_share(self, share, shnum):
+        self._share_observers[share] = o = share.get_block(self.segnum)
+        o.subscribe(self._block_request_activity, share=share, shnum=shnum)
+
+    def _ask_for_more_shares(self):
+        if not self._no_more_shares:
+            self._node.want_more_shares()
+            # that will trigger the ShareFinder to keep looking, and call our
+            # add_shares() or no_more_shares() later.
 
     def _cancel_all_requests(self):
         for o in self._share_observers.values():
@@ -207,27 +232,33 @@ class SegmentFetcher:
         log.msg("SegmentFetcher(%s)._block_request_activity:"
                 " Share(sh%d-on-%s) -> %s" %
                 (self._node._si_prefix, shnum, share._peerid_s, state),
-                level=log.NOISY, umid="vilNWA")
-        # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal.
+                level=log.NOISY, parent=self._lp, umid="vilNWA")
+        # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share
+        # from all our tracking lists.
         if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
             self._share_observers.pop(share, None)
+            self._shares_from_server.discard(shnum, share)
+            if self._active_share_map.get(shnum) is share:
+                del self._active_share_map[shnum]
+            self._overdue_share_map.discard(shnum, share)
+
         if state is COMPLETE:
-            # 'block' is fully validated
-            self._shares[share] = COMPLETE
+            # 'block' is fully validated and complete
             self._blocks[shnum] = block
-        elif state is OVERDUE:
-            self._shares[share] = OVERDUE
+
+        if state is OVERDUE:
+            # no longer active, but still might complete
+            del self._active_share_map[shnum]
+            self._overdue_share_map.add(shnum, share)
             # OVERDUE is not terminal: it will eventually transition to
             # COMPLETE, CORRUPT, or DEAD.
-        elif state is CORRUPT:
-            self._shares[share] = CORRUPT
-        elif state is DEAD:
-            del self._shares[share]
-            self._shnums[shnum].remove(share)
-            self._last_failure = f
-        elif state is BADSEGNUM:
-            self._shares[share] = BADSEGNUM # ???
-            self._bad_segnum = True
-        eventually(self.loop)
 
+        if state is DEAD:
+            self._last_failure = f
+        if state is BADSEGNUM:
+            # our main loop will ask the DownloadNode each time for the
+            # number of segments, so we'll deal with this in the top of
+            # _do_loop
+            pass
 
+        eventually(self.loop)
index 9adee99cf948d7c05cd2cf0a6db01d31219809a0..fa6204c7d71663bdcac109d17a40b3b0075a0848 100644 (file)
@@ -35,11 +35,9 @@ class ShareFinder:
         self._storage_broker = storage_broker
         self.share_consumer = self.node = node
         self.max_outstanding_requests = max_outstanding_requests
-
         self._hungry = False
 
         self._commonshares = {} # shnum to CommonShare instance
-        self.undelivered_shares = []
         self.pending_requests = set()
         self.overdue_requests = set() # subset of pending_requests
         self.overdue_timers = {}
@@ -52,6 +50,12 @@ class ShareFinder:
                            si=self._si_prefix,
                            level=log.NOISY, parent=logparent, umid="2xjj2A")
 
+    def update_num_segments(self):
+        (numsegs, authoritative) = self.node.get_num_segments()
+        assert authoritative
+        for cs in self._commonshares.values():
+            cs.set_authoritative_num_segments(numsegs)
+
     def start_finding_servers(self):
         # don't get servers until somebody uses us: creating the
         # ImmutableFileNode should not cause work to happen yet. Test case is
@@ -83,30 +87,16 @@ class ShareFinder:
 
     # internal methods
     def loop(self):
-        undelivered_s = ",".join(["sh%d@%s" %
-                                  (s._shnum, idlib.shortnodeid_b2a(s._peerid))
-                                  for s in self.undelivered_shares])
         pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
                               for rt in self.pending_requests]) # sort?
         self.log(format="ShareFinder loop: running=%(running)s"
-                 " hungry=%(hungry)s, undelivered=%(undelivered)s,"
-                 " pending=%(pending)s",
-                 running=self.running, hungry=self._hungry,
-                 undelivered=undelivered_s, pending=pending_s,
+                 " hungry=%(hungry)s, pending=%(pending)s",
+                 running=self.running, hungry=self._hungry, pending=pending_s,
                  level=log.NOISY, umid="kRtS4Q")
         if not self.running:
             return
         if not self._hungry:
             return
-        if self.undelivered_shares:
-            sh = self.undelivered_shares.pop(0)
-            # they will call hungry() again if they want more
-            self._hungry = False
-            self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)",
-                     shnum=sh._shnum, peerid=sh._peerid_s,
-                     level=log.NOISY, umid="2n1qQw")
-            eventually(self.share_consumer.got_shares, [sh])
-            return
 
         non_overdue = self.pending_requests - self.overdue_requests
         if len(non_overdue) >= self.max_outstanding_requests:
@@ -146,14 +136,16 @@ class ShareFinder:
         lp = self.log(format="sending DYHB to [%(peerid)s]",
                       peerid=idlib.shortnodeid_b2a(peerid),
                       level=log.NOISY, umid="Io7pyg")
-        d_ev = self._download_status.add_dyhb_sent(peerid, now())
+        time_sent = now()
+        d_ev = self._download_status.add_dyhb_sent(peerid, time_sent)
         # TODO: get the timer from a Server object, it knows best
         self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
                                                      self.overdue, req)
         d = rref.callRemote("get_buckets", self._storage_index)
         d.addBoth(incidentally, self._request_retired, req)
         d.addCallbacks(self._got_response, self._got_error,
-                       callbackArgs=(rref.version, peerid, req, d_ev, lp),
+                       callbackArgs=(rref.version, peerid, req, d_ev,
+                                     time_sent, lp),
                        errbackArgs=(peerid, req, d_ev, lp))
         d.addErrback(log.err, format="error in send_request",
                      level=log.WEIRD, parent=lp, umid="rpdV0w")
@@ -172,33 +164,37 @@ class ShareFinder:
         self.overdue_requests.add(req)
         eventually(self.loop)
 
-    def _got_response(self, buckets, server_version, peerid, req, d_ev, lp):
+    def _got_response(self, buckets, server_version, peerid, req, d_ev,
+                      time_sent, lp):
         shnums = sorted([shnum for shnum in buckets])
-        d_ev.finished(shnums, now())
-        if buckets:
-            shnums_s = ",".join([str(shnum) for shnum in shnums])
-            self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
-                     shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
-                     level=log.NOISY, parent=lp, umid="0fcEZw")
-        else:
+        time_received = now()
+        d_ev.finished(shnums, time_received)
+        dyhb_rtt = time_received - time_sent
+        if not buckets:
             self.log(format="no shares from [%(peerid)s]",
                      peerid=idlib.shortnodeid_b2a(peerid),
                      level=log.NOISY, parent=lp, umid="U7d4JA")
-        if self.node.num_segments is None:
-            best_numsegs = self.node.guessed_num_segments
-        else:
-            best_numsegs = self.node.num_segments
+            return
+        shnums_s = ",".join([str(shnum) for shnum in shnums])
+        self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
+                 shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
+                 level=log.NOISY, parent=lp, umid="0fcEZw")
+        shares = []
         for shnum, bucket in buckets.iteritems():
-            self._create_share(best_numsegs, shnum, bucket, server_version,
-                               peerid)
+            s = self._create_share(shnum, bucket, server_version, peerid,
+                                   dyhb_rtt)
+            shares.append(s)
+        self._deliver_shares(shares)
 
-    def _create_share(self, best_numsegs, shnum, bucket, server_version,
-                      peerid):
+    def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt):
         if shnum in self._commonshares:
             cs = self._commonshares[shnum]
         else:
-            cs = CommonShare(best_numsegs, self._si_prefix, shnum,
+            numsegs, authoritative = self.node.get_num_segments()
+            cs = CommonShare(numsegs, self._si_prefix, shnum,
                              self._node_logparent)
+            if authoritative:
+                cs.set_authoritative_num_segments(numsegs)
             # Share._get_satisfaction is responsible for updating
             # CommonShare.set_numsegs after we know the UEB. Alternatives:
             #  1: d = self.node.get_num_segments()
@@ -214,9 +210,17 @@ class ShareFinder:
             #     Yuck.
             self._commonshares[shnum] = cs
         s = Share(bucket, server_version, self.verifycap, cs, self.node,
-                  self._download_status, peerid, shnum,
+                  self._download_status, peerid, shnum, dyhb_rtt,
                   self._node_logparent)
-        self.undelivered_shares.append(s)
+        return s
+
+    def _deliver_shares(self, shares):
+        # they will call hungry() again if they want more
+        self._hungry = False
+        shares_s = ",".join([str(sh) for sh in shares])
+        self.log(format="delivering shares: %s" % shares_s,
+                 level=log.NOISY, umid="2n1qQw")
+        eventually(self.share_consumer.got_shares, shares)
 
     def _got_error(self, f, peerid, req, d_ev, lp):
         d_ev.finished("error", now())
index 4c92dd84b53d6572aace4659f2b914e6d2af60bb..33c16cfaa5487fae6bd2d9c69588389a5a6c3774 100644 (file)
@@ -72,7 +72,7 @@ class DownloadNode:
         # things to track callers that want data
 
         # _segment_requests can have duplicates
-        self._segment_requests = [] # (segnum, d, cancel_handle)
+        self._segment_requests = [] # (segnum, d, cancel_handle, logparent)
         self._active_segment = None # a SegmentFetcher, with .segnum
 
         self._segsize_observers = observer.OneShotObserverList()
@@ -81,7 +81,8 @@ class DownloadNode:
         # for each read() call. Segmentation and get_segment() messages are
         # associated with the read() call, everything else is tied to the
         # _Node's log entry.
-        lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d,"
+        lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
+                     " size=%(size)d,"
                      " guessed_segsize=%(guessed_segsize)d,"
                      " guessed_numsegs=%(guessed_numsegs)d",
                      si=self._si_prefix, size=verifycap.size,
@@ -103,9 +104,10 @@ class DownloadNode:
         # as with CommonShare, our ciphertext_hash_tree is a stub until we
         # get the real num_segments
         self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
+        self.ciphertext_hash_tree_leaves = self.guessed_num_segments
 
     def __repr__(self):
-        return "Imm_Node(%s)" % (self._si_prefix,)
+        return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
 
     def stop(self):
         # called by the Terminator at shutdown, mostly for tests
@@ -175,14 +177,14 @@ class DownloadNode:
         The Deferred can also errback with other fatal problems, such as
         NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
         """
-        log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
-                si=base32.b2a(self._verifycap.storage_index)[:8],
-                segnum=segnum,
-                level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
+        lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
+                     si=base32.b2a(self._verifycap.storage_index)[:8],
+                     segnum=segnum,
+                     level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
         self._download_status.add_segment_request(segnum, now())
         d = defer.Deferred()
         c = Cancel(self._cancel_request)
-        self._segment_requests.append( (segnum, d, c) )
+        self._segment_requests.append( (segnum, d, c, lp) )
         self._start_new_segment()
         return (d, c)
 
@@ -208,10 +210,11 @@ class DownloadNode:
         if self._active_segment is None and self._segment_requests:
             segnum = self._segment_requests[0][0]
             k = self._verifycap.needed_shares
+            lp = self._segment_requests[0][3]
             log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
                     node=repr(self), segnum=segnum,
-                    level=log.NOISY, umid="wAlnHQ")
-            self._active_segment = fetcher = SegmentFetcher(self, segnum, k)
+                    level=log.NOISY, parent=lp, umid="wAlnHQ")
+            self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
             active_shares = [s for s in self._shares if s.is_alive()]
             fetcher.add_shares(active_shares) # this triggers the loop
 
@@ -234,13 +237,17 @@ class DownloadNode:
         h = hashutil.uri_extension_hash(UEB_s)
         if h != self._verifycap.uri_extension_hash:
             raise BadHashError
-        UEB_dict = uri.unpack_extension(UEB_s)
-        self._parse_and_store_UEB(UEB_dict) # sets self._stuff
+        self._parse_and_store_UEB(UEB_s) # sets self._stuff
         # TODO: a malformed (but authentic) UEB could throw an assertion in
         # _parse_and_store_UEB, and we should abandon the download.
         self.have_UEB = True
 
-    def _parse_and_store_UEB(self, d):
+        # inform the ShareFinder about our correct number of segments. This
+        # will update the block-hash-trees in all existing CommonShare
+        # instances, and will populate new ones with the correct value.
+        self._sharefinder.update_num_segments()
+
+    def _parse_and_store_UEB(self, UEB_s):
         # Note: the UEB contains needed_shares and total_shares. These are
         # redundant and inferior (the filecap contains the authoritative
         # values). However, because it is possible to encode the same file in
@@ -252,8 +259,11 @@ class DownloadNode:
 
         # therefore, we ignore d['total_shares'] and d['needed_shares'].
 
+        d = uri.unpack_extension(UEB_s)
+
         log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
-                ueb=repr(d), vcap=self._verifycap.to_string(),
+                ueb=repr(uri.unpack_extension_readable(UEB_s)),
+                vcap=self._verifycap.to_string(),
                 level=log.NOISY, parent=self._lp, umid="cVqZnA")
 
         k, N = self._verifycap.needed_shares, self._verifycap.total_shares
@@ -292,6 +302,7 @@ class DownloadNode:
         # shares of file B. self.ciphertext_hash_tree was a guess before:
         # this is where we create it for real.
         self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
+        self.ciphertext_hash_tree_leaves = self.num_segments
         self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
 
         self.share_hash_tree.set_hashes({0: d['share_root_hash']})
@@ -344,9 +355,15 @@ class DownloadNode:
                                    % (hashnum, len(self.share_hash_tree)))
         self.share_hash_tree.set_hashes(share_hashes)
 
+    def get_desired_ciphertext_hashes(self, segnum):
+        if segnum < self.ciphertext_hash_tree_leaves:
+            return self.ciphertext_hash_tree.needed_hashes(segnum,
+                                                           include_leaf=True)
+        return []
     def get_needed_ciphertext_hashes(self, segnum):
         cht = self.ciphertext_hash_tree
         return cht.needed_hashes(segnum, include_leaf=True)
+
     def process_ciphertext_hashes(self, hashes):
         assert self.num_segments is not None
         # this may raise BadHashError or NotEnoughHashesError
@@ -457,7 +474,7 @@ class DownloadNode:
     def _extract_requests(self, segnum):
         """Remove matching requests and return their (d,c) tuples so that the
         caller can retire them."""
-        retire = [(d,c) for (segnum0, d, c) in self._segment_requests
+        retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests
                   if segnum0 == segnum]
         self._segment_requests = [t for t in self._segment_requests
                                   if t[0] != segnum]
@@ -466,10 +483,18 @@ class DownloadNode:
     def _cancel_request(self, c):
         self._segment_requests = [t for t in self._segment_requests
                                   if t[2] != c]
-        segnums = [segnum for (segnum,d,c) in self._segment_requests]
+        segnums = [segnum for (segnum,d,c,lp) in self._segment_requests]
         # self._active_segment might be None in rare circumstances, so make
         # sure we tolerate it
         if self._active_segment and self._active_segment.segnum not in segnums:
             self._active_segment.stop()
             self._active_segment = None
             self._start_new_segment()
+
+    # called by ShareFinder to choose hashtree sizes in CommonShares, and by
+    # SegmentFetcher to tell if it is still fetching a valid segnum.
+    def get_num_segments(self):
+        # returns (best_num_segments, authoritative)
+        if self.num_segments is None:
+            return (self.guessed_num_segments, False)
+        return (self.num_segments, True)
index 413f90772f5f14a441b68aceac4bb1fca446fed3..78cce8ed0906d0075a773c9a173284d69f05218a 100644 (file)
@@ -33,7 +33,7 @@ class Share:
     # servers. A different backend would use a different class.
 
     def __init__(self, rref, server_version, verifycap, commonshare, node,
-                 download_status, peerid, shnum, logparent):
+                 download_status, peerid, shnum, dyhb_rtt, logparent):
         self._rref = rref
         self._server_version = server_version
         self._node = node # holds share_hash_tree and UEB
@@ -51,6 +51,7 @@ class Share:
         self._storage_index = verifycap.storage_index
         self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
         self._shnum = shnum
+        self._dyhb_rtt = dyhb_rtt
         # self._alive becomes False upon fatal corruption or server error
         self._alive = True
         self._lp = log.msg(format="%(share)s created", share=repr(self),
@@ -278,15 +279,16 @@ class Share:
             if not self._satisfy_UEB():
                 # can't check any hashes without the UEB
                 return False
+            # the call to _satisfy_UEB() will immediately set the
+            # authoritative num_segments in all our CommonShares. If we
+            # guessed wrong, we might stil be working on a bogus segnum
+            # (beyond the real range). We catch this and signal BADSEGNUM
+            # before invoking any further code that touches hashtrees.
         self.actual_segment_size = self._node.segment_size # might be updated
         assert self.actual_segment_size is not None
 
-        # knowing the UEB means knowing num_segments. Despite the redundancy,
-        # this is the best place to set this. CommonShare.set_numsegs will
-        # ignore duplicate calls.
+        # knowing the UEB means knowing num_segments
         assert self._node.num_segments is not None
-        cs = self._commonshare
-        cs.set_numsegs(self._node.num_segments)
 
         segnum, observers = self._active_segnum_and_observers()
         # if segnum is None, we don't really need to do anything (we have no
@@ -304,9 +306,9 @@ class Share:
                 # can't check block_hash_tree without a root
                 return False
 
-        if cs.need_block_hash_root():
+        if self._commonshare.need_block_hash_root():
             block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
-            cs.set_block_hash_root(block_hash_root)
+            self._commonshare.set_block_hash_root(block_hash_root)
 
         if segnum is None:
             return False # we don't want any particular segment right now
@@ -360,7 +362,8 @@ class Share:
                                   ] ):
             offsets[field] = fields[i]
         self.actual_offsets = offsets
-        log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields))
+        log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields),
+                level=log.NOISY, parent=self._lp, umid="jedQcw")
         self._received.remove(0, 4) # don't need this anymore
 
         # validate the offsets a bit
@@ -517,7 +520,8 @@ class Share:
         block = self._received.pop(blockstart, blocklen)
         if not block:
             log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
-                                                              blockstart, blocklen))
+                                                              blockstart, blocklen),
+                    level=log.NOISY, parent=self._lp, umid="aK0RFw")
             return False
         log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
                 share=repr(self), start=blockstart, length=blocklen,
@@ -589,29 +593,17 @@ class Share:
         if self.actual_offsets or self._overrun_ok:
             if not self._node.have_UEB:
                 self._desire_UEB(desire, o)
-            # They might ask for a segment that doesn't look right.
-            # _satisfy() will catch+reject bad segnums once we know the UEB
-            # (and therefore segsize and numsegs), so we'll only fail this
-            # test if we're still guessing. We want to avoid asking the
-            # hashtrees for needed_hashes() for bad segnums. So don't enter
-            # _desire_hashes or _desire_data unless the segnum looks
-            # reasonable.
-            if segnum < r["num_segments"]:
-                # XXX somehow we're getting here for sh5. we don't yet know
-                # the actual_segment_size, we're still working off the guess.
-                # the ciphertext_hash_tree has been corrected, but the
-                # commonshare._block_hash_tree is still in the guessed state.
-                self._desire_share_hashes(desire, o)
-                if segnum is not None:
-                    self._desire_block_hashes(desire, o, segnum)
-                    self._desire_data(desire, o, r, segnum, segsize)
-            else:
-                log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)"
-                        % (segnum, r["num_segments"]),
-                        level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
+            self._desire_share_hashes(desire, o)
+            if segnum is not None:
+                # They might be asking for a segment number that is beyond
+                # what we guess the file contains, but _desire_block_hashes
+                # and _desire_data will tolerate that.
+                self._desire_block_hashes(desire, o, segnum)
+                self._desire_data(desire, o, r, segnum, segsize)
 
         log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
-                % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()))
+                % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()),
+                level=log.NOISY, parent=self._lp, umid="IG7CgA")
         if self.actual_offsets:
             return (want_it, need_it+gotta_gotta_have_it)
         else:
@@ -681,14 +673,30 @@ class Share:
         (want_it, need_it, gotta_gotta_have_it) = desire
 
         # block hash chain
-        for hashnum in self._commonshare.get_needed_block_hashes(segnum):
+        for hashnum in self._commonshare.get_desired_block_hashes(segnum):
             need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
 
         # ciphertext hash chain
-        for hashnum in self._node.get_needed_ciphertext_hashes(segnum):
+        for hashnum in self._node.get_desired_ciphertext_hashes(segnum):
             need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
 
     def _desire_data(self, desire, o, r, segnum, segsize):
+        if segnum > r["num_segments"]:
+            # they're asking for a segment that's beyond what we think is the
+            # end of the file. We won't get here if we've already learned the
+            # real UEB: _get_satisfaction() will notice the out-of-bounds and
+            # terminate the loop. So we must still be guessing, which means
+            # that they might be correct in asking for such a large segnum.
+            # But if they're right, then our segsize/segnum guess is
+            # certainly wrong, which means we don't know what data blocks to
+            # ask for yet. So don't bother adding anything. When the UEB
+            # comes back and we learn the correct segsize/segnums, we'll
+            # either reject the request or have enough information to proceed
+            # normally. This costs one roundtrip.
+            log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)"
+                    % (segnum, r["num_segments"]),
+                    level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
+            return
         (want_it, need_it, gotta_gotta_have_it) = desire
         tail = (segnum == r["num_segments"]-1)
         datastart = o["data"]
@@ -803,34 +811,62 @@ class Share:
 
 
 class CommonShare:
+    # TODO: defer creation of the hashtree until somebody uses us. There will
+    # be a lot of unused shares, and we shouldn't spend the memory on a large
+    # hashtree unless necessary.
     """I hold data that is common across all instances of a single share,
     like sh2 on both servers A and B. This is just the block hash tree.
     """
-    def __init__(self, guessed_numsegs, si_prefix, shnum, logparent):
+    def __init__(self, best_numsegs, si_prefix, shnum, logparent):
         self.si_prefix = si_prefix
         self.shnum = shnum
+
         # in the beginning, before we have the real UEB, we can only guess at
         # the number of segments. But we want to ask for block hashes early.
         # So if we're asked for which block hashes are needed before we know
         # numsegs for sure, we return a guess.
-        self._block_hash_tree = IncompleteHashTree(guessed_numsegs)
-        self._know_numsegs = False
+        self._block_hash_tree = IncompleteHashTree(best_numsegs)
+        self._block_hash_tree_is_authoritative = False
+        self._block_hash_tree_leaves = best_numsegs
         self._logparent = logparent
 
-    def set_numsegs(self, numsegs):
-        if self._know_numsegs:
-            return
-        self._block_hash_tree = IncompleteHashTree(numsegs)
-        self._know_numsegs = True
+    def __repr__(self):
+        return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum)
+
+    def set_authoritative_num_segments(self, numsegs):
+        if self._block_hash_tree_leaves != numsegs:
+            self._block_hash_tree = IncompleteHashTree(numsegs)
+            self._block_hash_tree_leaves = numsegs
+        self._block_hash_tree_is_authoritative = True
 
     def need_block_hash_root(self):
         return bool(not self._block_hash_tree[0])
 
     def set_block_hash_root(self, roothash):
-        assert self._know_numsegs
+        assert self._block_hash_tree_is_authoritative
         self._block_hash_tree.set_hashes({0: roothash})
 
+    def get_desired_block_hashes(self, segnum):
+        if segnum < self._block_hash_tree_leaves:
+            return self._block_hash_tree.needed_hashes(segnum,
+                                                       include_leaf=True)
+
+        # the segnum might be out-of-bounds. Originally it was due to a race
+        # between the receipt of the UEB on one share (from which we learn
+        # the correct number of segments, update all hash trees to the right
+        # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery
+        # of a new Share to the SegmentFetcher while that BADSEGNUM was
+        # queued (which sends out requests to the stale segnum, now larger
+        # than the hash tree). I fixed that (by making SegmentFetcher.loop
+        # check for a bad segnum at the start of each pass, instead of using
+        # the queued BADSEGNUM or a flag it sets), but just in case this
+        # still happens, I'm leaving the < in place. If it gets hit, there's
+        # a potential lost-progress problem, but I'm pretty sure that it will
+        # get cleared up on the following turn.
+        return []
+
     def get_needed_block_hashes(self, segnum):
+        assert self._block_hash_tree_is_authoritative
         # XXX: include_leaf=True needs thought: how did the old downloader do
         # it? I think it grabbed *all* block hashes and set them all at once.
         # Since we want to fetch less data, we either need to fetch the leaf
@@ -840,12 +876,25 @@ class CommonShare:
         return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
 
     def process_block_hashes(self, block_hashes):
-        assert self._know_numsegs
+        assert self._block_hash_tree_is_authoritative
         # this may raise BadHashError or NotEnoughHashesError
         self._block_hash_tree.set_hashes(block_hashes)
 
     def check_block(self, segnum, block):
-        assert self._know_numsegs
+        assert self._block_hash_tree_is_authoritative
         h = hashutil.block_hash(block)
         # this may raise BadHashError or NotEnoughHashesError
         self._block_hash_tree.set_hashes(leaves={segnum: h})
+
+# TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an
+# auxilliary OVERDUE callback. Just make sure to get all the messages in the
+# right order and on the right turns.
+
+# TODO: we're asking for too much data. We probably don't need
+# include_leaf=True in the block hash tree or ciphertext hash tree.
+
+# TODO: we ask for ciphertext hash tree nodes from all shares (whenever
+# _desire is called while we're missing those nodes), but we only consume it
+# from the first response, leaving the rest of the data sitting in _received.
+# This was ameliorated by clearing self._received after each block is
+# complete.
index db5bf5f8e577462534652fbd49a5dde6b07068c8..245312607468fc1941f0a9ad7db43392c53cb535 100644 (file)
@@ -2303,8 +2303,8 @@ class Errors(GridTestMixin, CLITestMixin, unittest.TestCase):
         # the download is abandoned as soon as it's clear that we won't get
         # enough shares. The one remaining share might be in either the
         # COMPLETE or the PENDING state.
-        in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3"
-        in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3"
+        in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3"
+        in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3"
 
         d.addCallback(lambda ign: self.do_cli("get", self.uri_1share))
         def _check1((rc, out, err)):
index 71a556bb6b5540005fe3de71caee4b12e52f92a8..40f0d62c07f08597e8f13221f5985d10a0f923a4 100644 (file)
@@ -15,8 +15,9 @@ from allmydata.test.no_network import GridTestMixin
 from allmydata.test.common import ShouldFailMixin
 from allmydata.interfaces import NotEnoughSharesError, NoSharesError
 from allmydata.immutable.downloader.common import BadSegmentNumberError, \
-     BadCiphertextHashError, DownloadStopped
+     BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD
 from allmydata.immutable.downloader.status import DownloadStatus
+from allmydata.immutable.downloader.fetcher import SegmentFetcher
 from allmydata.codec import CRSDecoder
 from foolscap.eventual import fireEventually, flushEventualQueue
 
@@ -295,7 +296,7 @@ class DownloadTest(_Base, unittest.TestCase):
             # shares
             servers = []
             shares = sorted([s._shnum for s in self.n._cnode._node._shares])
-            self.failUnlessEqual(shares, [0,1,2])
+            self.failUnlessEqual(shares, [0,1,2,3])
             # break the RIBucketReader references
             for s in self.n._cnode._node._shares:
                 s._rref.broken = True
@@ -318,7 +319,7 @@ class DownloadTest(_Base, unittest.TestCase):
             self.failUnlessEqual("".join(c.chunks), plaintext)
             shares = sorted([s._shnum for s in self.n._cnode._node._shares])
             # we should now be using more shares than we were before
-            self.failIfEqual(shares, [0,1,2])
+            self.failIfEqual(shares, [0,1,2,3])
         d.addCallback(_check_failover)
         return d
 
@@ -539,7 +540,7 @@ class DownloadTest(_Base, unittest.TestCase):
             def _con1_should_not_succeed(res):
                 self.fail("the first read should not have succeeded")
             def _con1_failed(f):
-                self.failUnless(f.check(NotEnoughSharesError))
+                self.failUnless(f.check(NoSharesError))
                 con2.producer.stopProducing()
                 return d2
             d.addCallbacks(_con1_should_not_succeed, _con1_failed)
@@ -583,7 +584,7 @@ class DownloadTest(_Base, unittest.TestCase):
             def _con1_should_not_succeed(res):
                 self.fail("the first read should not have succeeded")
             def _con1_failed(f):
-                self.failUnless(f.check(NotEnoughSharesError))
+                self.failUnless(f.check(NoSharesError))
                 # we *don't* cancel the second one here: this exercises a
                 # lost-progress bug from #1154. We just wait for it to
                 # succeed.
@@ -1121,7 +1122,7 @@ class Corruption(_Base, unittest.TestCase):
                 # All these tests result in a failed download.
                 d.addCallback(self._corrupt_flip_all, imm_uri, i)
                 d.addCallback(lambda ign:
-                              self.shouldFail(NotEnoughSharesError, which,
+                              self.shouldFail(NoSharesError, which,
                                               substring,
                                               _download, imm_uri))
                 d.addCallback(lambda ign: self.restore_all_shares(self.shares))
@@ -1257,3 +1258,332 @@ class Status(unittest.TestCase):
         e2.update(1000, 2.0, 2.0)
         e2.finished(now+5)
         self.failUnlessEqual(ds.get_progress(), 1.0)
+
+class MyShare:
+    def __init__(self, shnum, peerid, rtt):
+        self._shnum = shnum
+        self._peerid = peerid
+        self._peerid_s = peerid
+        self._dyhb_rtt = rtt
+    def __repr__(self):
+        return "sh%d-on-%s" % (self._shnum, self._peerid)
+
+class MySegmentFetcher(SegmentFetcher):
+    def __init__(self, *args, **kwargs):
+        SegmentFetcher.__init__(self, *args, **kwargs)
+        self._test_start_shares = []
+    def _start_share(self, share, shnum):
+        self._test_start_shares.append(share)
+
+class FakeNode:
+    def __init__(self):
+        self.want_more = 0
+        self.failed = None
+        self.processed = None
+        self._si_prefix = "si_prefix"
+    def want_more_shares(self):
+        self.want_more += 1
+    def fetch_failed(self, fetcher, f):
+        self.failed = f
+    def process_blocks(self, segnum, blocks):
+        self.processed = (segnum, blocks)
+    def get_num_segments(self):
+        return 1, True
+
+class Selection(unittest.TestCase):
+    def test_no_shares(self):
+        node = FakeNode()
+        sf = SegmentFetcher(node, 0, 3, None)
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(node.failed, None)
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NoSharesError))
+        d.addCallback(_check2)
+        return d
+
+    def test_only_one_share(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(0, "peer-A", 0.0)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(node.failed, None)
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NotEnoughSharesError))
+            self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
+                              str(node.failed))
+        d.addCallback(_check2)
+        return d
+
+    def test_good_diversity_early(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check2)
+        return d
+
+    def test_good_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_avoid_bad_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we could satisfy the read entirely from the first server, but we'd
+        # prefer not to. Instead, we expect to only pull one share from the
+        # first server
+        shares = [MyShare(0, "peer-A", 0.0),
+                  MyShare(1, "peer-A", 0.0),
+                  MyShare(2, "peer-A", 0.0),
+                  MyShare(3, "peer-B", 1.0),
+                  MyShare(4, "peer-C", 2.0),
+                  ]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[3], shares[4]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      3: "block-3",
+                                                      4: "block-4"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_suffer_bad_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we satisfy the read entirely from the first server because we don't
+        # have any other choice.
+        shares = [MyShare(0, "peer-A", 0.0),
+                  MyShare(1, "peer-A", 0.0),
+                  MyShare(2, "peer-A", 0.0),
+                  MyShare(3, "peer-A", 0.0),
+                  MyShare(4, "peer-A", 0.0),
+                  ]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(node.want_more, 3)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[2]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_suffer_bad_diversity_early(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we satisfy the read entirely from the first server because we don't
+        # have any other choice.
+        shares = [MyShare(0, "peer-A", 0.0),
+                  MyShare(1, "peer-A", 0.0),
+                  MyShare(2, "peer-A", 0.0),
+                  MyShare(3, "peer-A", 0.0),
+                  MyShare(4, "peer-A", 0.0),
+                  ]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 2)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[2]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check2)
+        return d
+
+    def test_overdue(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, OVERDUE)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:6])
+            for sh in sf._test_start_shares[3:]:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {3: "block-3",
+                                                      4: "block-4",
+                                                      5: "block-5"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_overdue_fails(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
+        sf.add_shares(shares)
+        sf.no_more_shares()
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, OVERDUE)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:6])
+            for sh in sf._test_start_shares[3:]:
+                sf._block_request_activity(sh, sh._shnum, DEAD)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            # we're still waiting
+            self.failUnlessEqual(node.processed, None)
+            self.failUnlessEqual(node.failed, None)
+            # now complete one of the overdue ones, and kill one of the other
+            # ones, leaving one hanging. This should trigger a failure, since
+            # we cannot succeed.
+            live = sf._test_start_shares[0]
+            die = sf._test_start_shares[1]
+            sf._block_request_activity(live, live._shnum, COMPLETE, "block")
+            sf._block_request_activity(die, die._shnum, DEAD)
+            return flushEventualQueue()
+        d.addCallback(_check3)
+        def _check4(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NotEnoughSharesError))
+            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
+                              str(node.failed))
+        d.addCallback(_check4)
+        return d
+
+    def test_avoid_redundancy(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we could satisfy the read entirely from the first server, but we'd
+        # prefer not to. Instead, we expect to only pull one share from the
+        # first server
+        shares = [MyShare(0, "peer-A", 0.0),
+                  MyShare(1, "peer-B", 1.0),
+                  MyShare(0, "peer-C", 2.0), # this will be skipped
+                  MyShare(1, "peer-D", 3.0),
+                  MyShare(2, "peer-E", 4.0),
+                  ]
+        sf.add_shares(shares[:3])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1]])
+            # allow sh1 to retire
+            sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            # and then feed in the remaining shares
+            sf.add_shares(shares[3:])
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[4]])
+            sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
+            sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
+            return flushEventualQueue()
+        d.addCallback(_check3)
+        def _check4(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check4)
+        return d
index 288332d08e928ed6108a005571a3d7a63c799cd9..511a865b2903724e9d3fa501d14159ab03ee5dab 100644 (file)
@@ -52,7 +52,7 @@ class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase)
         def _after_download(unused=None):
             after_download_reads = self._count_reads()
             #print before_download_reads, after_download_reads
-            self.failIf(after_download_reads-before_download_reads > 36,
+            self.failIf(after_download_reads-before_download_reads > 41,
                         (after_download_reads, before_download_reads))
         d.addCallback(self._download_and_check_plaintext)
         d.addCallback(_after_download)
index 3008046474ef1eb23dae24e6023757d0aa78ab5e..f68e98d8ef4495e3d429594af91f9df3fb8fa3a4 100644 (file)
@@ -4259,15 +4259,20 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi
         def _check_one_share(body):
             self.failIf("<html>" in body, body)
             body = " ".join(body.strip().split())
-            msg = ("NotEnoughSharesError: This indicates that some "
-                   "servers were unavailable, or that shares have been "
-                   "lost to server departure, hard drive failure, or disk "
-                   "corruption. You should perform a filecheck on "
-                   "this object to learn more. The full error message is:"
-                   " ran out of shares: %d complete, %d pending, 0 overdue,"
-                   " 0 unused, need 3. Last failure: None")
-            msg1 = msg % (1, 0)
-            msg2 = msg % (0, 1)
+            msgbase = ("NotEnoughSharesError: This indicates that some "
+                       "servers were unavailable, or that shares have been "
+                       "lost to server departure, hard drive failure, or disk "
+                       "corruption. You should perform a filecheck on "
+                       "this object to learn more. The full error message is:"
+                       )
+            msg1 = msgbase + (" ran out of shares:"
+                              " complete=sh0"
+                              " pending="
+                              " overdue= unused= need 3. Last failure: None")
+            msg2 = msgbase + (" ran out of shares:"
+                              " complete="
+                              " pending=Share(sh0-on-xgru5)"
+                              " overdue= unused= need 3. Last failure: None")
             self.failUnless(body == msg1 or body == msg2, body)
         d.addCallback(_check_one_share)