]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_download.py
move DownloadStopped from download.common to interfaces
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_download.py
index fa42b24778a3f1e4ad0d01a124769a0126d66213..30485b4a020da0ad8b0a23eeba5c2a9b296174fa 100644 (file)
@@ -8,14 +8,17 @@ from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from allmydata import uri
 from allmydata.storage.server import storage_index_to_dir
-from allmydata.util import base32, fileutil, spans, log
+from allmydata.util import base32, fileutil, spans, log, hashutil
 from allmydata.util.consumer import download_to_data, MemoryConsumer
 from allmydata.immutable import upload, layout
-from allmydata.test.no_network import GridTestMixin
+from allmydata.test.no_network import GridTestMixin, NoNetworkServer
 from allmydata.test.common import ShouldFailMixin
-from allmydata.interfaces import NotEnoughSharesError, NoSharesError
+from allmydata.interfaces import NotEnoughSharesError, NoSharesError, \
+     DownloadStopped
 from allmydata.immutable.downloader.common import BadSegmentNumberError, \
-     BadCiphertextHashError, DownloadStopped
+     BadCiphertextHashError, COMPLETE, OVERDUE, DEAD
+from allmydata.immutable.downloader.status import DownloadStatus
+from allmydata.immutable.downloader.fetcher import SegmentFetcher
 from allmydata.codec import CRSDecoder
 from foolscap.eventual import fireEventually, flushEventualQueue
 
@@ -288,36 +291,28 @@ class DownloadTest(_Base, unittest.TestCase):
         def _got_data(data):
             self.failUnlessEqual(data, plaintext)
         d.addCallback(_got_data)
-        def _kill_some_servers():
-            # find the three shares that were used, and delete them. Then
-            # download again, forcing the downloader to fail over to other
-            # shares
-            servers = []
-            shares = sorted([s._shnum for s in self.n._cnode._node._shares])
-            self.failUnlessEqual(shares, [0,1,2])
+        def _kill_some_shares():
+            # find the shares that were used and delete them
+            shares = self.n._cnode._node._shares
+            shnums = sorted([s._shnum for s in shares])
+            self.failUnlessEqual(shnums, [0,1,2,3])
+
             # break the RIBucketReader references
-            for s in self.n._cnode._node._shares:
+            # (we don't break the RIStorageServer references, because that
+            # isn't needed to test the current downloader implementation)
+            for s in shares:
                 s._rref.broken = True
-                for servernum in immutable_shares:
-                    for shnum in immutable_shares[servernum]:
-                        if s._shnum == shnum:
-                            ss = self.g.servers_by_number[servernum]
-                            servers.append(ss)
-            # and, for good measure, break the RIStorageServer references
-            # too, just in case the downloader gets more aggressive in the
-            # future and tries to re-fetch the same share.
-            for ss in servers:
-                wrapper = self.g.servers_by_id[ss.my_nodeid]
-                wrapper.broken = True
         def _download_again(ign):
-            c = StallingConsumer(_kill_some_servers)
+            # download again, deleting some shares after the first write
+            # to the consumer
+            c = StallingConsumer(_kill_some_shares)
             return self.n.read(c)
         d.addCallback(_download_again)
         def _check_failover(c):
             self.failUnlessEqual("".join(c.chunks), plaintext)
-            shares = sorted([s._shnum for s in self.n._cnode._node._shares])
-            # we should now be using more shares than we were before
-            self.failIfEqual(shares, [0,1,2])
+            shares = self.n._cnode._node._shares
+            shnums = sorted([s._shnum for s in shares])
+            self.failIfEqual(shnums, [0,1,2,3])
         d.addCallback(_check_failover)
         return d
 
@@ -406,14 +401,13 @@ class DownloadTest(_Base, unittest.TestCase):
             n._cnode._maybe_create_download_node()
             n._cnode._node._build_guessed_tables(u.max_segment_size)
             d1 = n.read(con1, 70, 20)
-            #d2 = n.read(con2, 140, 20) # XXX
-            d2 = defer.succeed(None)
+            d2 = n.read(con2, 140, 20)
             return defer.gatherResults([d1,d2])
         d.addCallback(_uploaded)
         def _done(res):
             self.failUnlessEqual("".join(con1.chunks), plaintext[70:90])
             self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
-        #d.addCallback(_done)
+        d.addCallback(_done)
         return d
 
     def test_sequential_goodguess(self):
@@ -486,6 +480,113 @@ class DownloadTest(_Base, unittest.TestCase):
         d.addCallback(_done)
         return d
 
+    def test_simultaneous_onefails_onecancelled(self):
+        # This exercises an mplayer behavior in ticket #1154. I believe that
+        # mplayer made two simultaneous webapi GET requests: first one for an
+        # index region at the end of the (mp3/video) file, then one for the
+        # first block of the file (the order doesn't really matter). All GETs
+        # failed (NoSharesError) because of the type(__len__)==long bug. Each
+        # GET submitted a DownloadNode.get_segment() request, which was
+        # queued by the DN (DN._segment_requests), so the second one was
+        # blocked waiting on the first one. When the first one failed,
+        # DN.fetch_failed() was invoked, which errbacks the first GET, but
+        # left the other one hanging (the lost-progress bug mentioned in
+        # #1154 comment 10)
+        #
+        # Then mplayer sees that the index region GET failed, so it cancels
+        # the first-block GET (by closing the HTTP request), triggering
+        # stopProducer. The second GET was waiting in the Deferred (between
+        # n.get_segment() and self._request_retired), so its
+        # _cancel_segment_request was active, so was invoked. However,
+        # DN._active_segment was None since it was not working on any segment
+        # at that time, hence the error in #1154.
+
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file with multiple segments, so we can catch the download
+        # in the middle. Tell the downloader, so it can guess correctly.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 70 # 5 segs
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            # corrupt all the shares so the download will fail
+            def _corruptor(s, debug=False):
+                which = 48 # first byte of block0
+                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+            self.corrupt_all_shares(ur.uri, _corruptor)
+            n = self.c0.create_node_from_uri(ur.uri)
+            n._cnode._maybe_create_download_node()
+            n._cnode._node._build_guessed_tables(u.max_segment_size)
+            con1 = MemoryConsumer()
+            con2 = MemoryConsumer()
+            d = n.read(con1, 0L, 20)
+            d2 = n.read(con2, 140L, 20)
+            # con2 will be cancelled, so d2 should fail with DownloadStopped
+            def _con2_should_not_succeed(res):
+                self.fail("the second read should not have succeeded")
+            def _con2_failed(f):
+                self.failUnless(f.check(DownloadStopped))
+            d2.addCallbacks(_con2_should_not_succeed, _con2_failed)
+
+            def _con1_should_not_succeed(res):
+                self.fail("the first read should not have succeeded")
+            def _con1_failed(f):
+                self.failUnless(f.check(NoSharesError))
+                con2.producer.stopProducing()
+                return d2
+            d.addCallbacks(_con1_should_not_succeed, _con1_failed)
+            return d
+        d.addCallback(_uploaded)
+        return d
+
+    def test_simultaneous_onefails(self):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+
+        # upload a file with multiple segments, so we can catch the download
+        # in the middle. Tell the downloader, so it can guess correctly.
+        u = upload.Data(plaintext, None)
+        u.max_segment_size = 70 # 5 segs
+        d = self.c0.upload(u)
+        def _uploaded(ur):
+            # corrupt all the shares so the download will fail
+            def _corruptor(s, debug=False):
+                which = 48 # first byte of block0
+                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
+            self.corrupt_all_shares(ur.uri, _corruptor)
+            n = self.c0.create_node_from_uri(ur.uri)
+            n._cnode._maybe_create_download_node()
+            n._cnode._node._build_guessed_tables(u.max_segment_size)
+            con1 = MemoryConsumer()
+            con2 = MemoryConsumer()
+            d = n.read(con1, 0L, 20)
+            d2 = n.read(con2, 140L, 20)
+            # con2 should wait for con1 to fail and then con2 should succeed.
+            # In particular, we should not lose progress. If this test fails,
+            # it will fail with a timeout error.
+            def _con2_should_succeed(res):
+                # this should succeed because we only corrupted the first
+                # segment of each share. The segment that holds [140:160] is
+                # fine, as are the hash chains and UEB.
+                self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
+            d2.addCallback(_con2_should_succeed)
+
+            def _con1_should_not_succeed(res):
+                self.fail("the first read should not have succeeded")
+            def _con1_failed(f):
+                self.failUnless(f.check(NoSharesError))
+                # we *don't* cancel the second one here: this exercises a
+                # lost-progress bug from #1154. We just wait for it to
+                # succeed.
+                return d2
+            d.addCallbacks(_con1_should_not_succeed, _con1_failed)
+            return d
+        d.addCallback(_uploaded)
+        return d
+
     def test_download_no_overrun(self):
         self.basedir = self.mktemp()
         self.set_up_grid()
@@ -496,7 +597,8 @@ class DownloadTest(_Base, unittest.TestCase):
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -599,7 +701,7 @@ class DownloadTest(_Base, unittest.TestCase):
         return d
 
     def test_stop(self):
-        # use a download targetthat does an immediate stop (ticket #473)
+        # use a download target that stops after the first segment (#473)
         self.basedir = self.mktemp()
         self.set_up_grid()
         self.c0 = self.g.clients[0]
@@ -611,6 +713,36 @@ class DownloadTest(_Base, unittest.TestCase):
                             n.read, c)
         return d
 
+    def test_stop_immediately(self):
+        # and a target that stops right after registerProducer (maybe #1154)
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+
+        c = ImmediatelyStoppingConsumer() # stops after registerProducer
+        d = self.shouldFail(DownloadStopped, "test_stop_immediately",
+                            "our Consumer called stopProducing()",
+                            n.read, c)
+        return d
+
+    def test_stop_immediately2(self):
+        # and a target that stops right after registerProducer (maybe #1154)
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        self.load_shares()
+        n = self.c0.create_node_from_uri(immutable_uri)
+
+        c = MemoryConsumer()
+        d0 = n.read(c)
+        c.producer.stopProducing()
+        d = self.shouldFail(DownloadStopped, "test_stop_immediately",
+                            "our Consumer called stopProducing()",
+                            lambda: d0)
+        return d
+
     def test_download_segment_bad_ciphertext_hash(self):
         # The crypttext_hash_tree asserts the integrity of the decoded
         # ciphertext, and exists to detect two sorts of problems. The first
@@ -776,6 +908,11 @@ class StoppingConsumer(PausingConsumer):
     def write(self, data):
         self.producer.stopProducing()
 
+class ImmediatelyStoppingConsumer(MemoryConsumer):
+    def registerProducer(self, p, streaming):
+        MemoryConsumer.registerProducer(self, p, streaming)
+        self.producer.stopProducing()
+
 class StallingConsumer(MemoryConsumer):
     def __init__(self, halfway_cb):
         MemoryConsumer.__init__(self)
@@ -979,7 +1116,7 @@ class Corruption(_Base, unittest.TestCase):
                 # All these tests result in a failed download.
                 d.addCallback(self._corrupt_flip_all, imm_uri, i)
                 d.addCallback(lambda ign:
-                              self.shouldFail(NotEnoughSharesError, which,
+                              self.shouldFail(NoSharesError, which,
                                               substring,
                                               _download, imm_uri))
                 d.addCallback(lambda ign: self.restore_all_shares(self.shares))
@@ -1032,7 +1169,8 @@ class DownloadV2(_Base, unittest.TestCase):
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1051,7 +1189,8 @@ class DownloadV2(_Base, unittest.TestCase):
         self.set_up_grid()
         self.c0 = self.g.clients[0]
 
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1073,3 +1212,406 @@ class DownloadV2(_Base, unittest.TestCase):
             return d
         d.addCallback(_uploaded)
         return d
+
+class Status(unittest.TestCase):
+    def test_status(self):
+        now = 12345.1
+        ds = DownloadStatus("si-1", 123)
+        self.failUnlessEqual(ds.get_status(), "idle")
+        ev0 = ds.add_segment_request(0, now)
+        self.failUnlessEqual(ds.get_status(), "fetching segment 0")
+        ev0.activate(now+0.5)
+        ev0.deliver(now+1, 0, 1000, 2.0)
+        self.failUnlessEqual(ds.get_status(), "idle")
+        ev2 = ds.add_segment_request(2, now+2)
+        del ev2 # hush pyflakes
+        ev1 = ds.add_segment_request(1, now+2)
+        self.failUnlessEqual(ds.get_status(), "fetching segments 1,2")
+        ev1.error(now+3)
+        self.failUnlessEqual(ds.get_status(),
+                             "fetching segment 2; errors on segment 1")
+
+    def test_progress(self):
+        now = 12345.1
+        ds = DownloadStatus("si-1", 123)
+        self.failUnlessEqual(ds.get_progress(), 0.0)
+        e = ds.add_read_event(0, 1000, now)
+        self.failUnlessEqual(ds.get_progress(), 0.0)
+        e.update(500, 2.0, 2.0)
+        self.failUnlessEqual(ds.get_progress(), 0.5)
+        e.finished(now+2)
+        self.failUnlessEqual(ds.get_progress(), 1.0)
+
+        e1 = ds.add_read_event(1000, 2000, now+3)
+        e2 = ds.add_read_event(4000, 2000, now+3)
+        self.failUnlessEqual(ds.get_progress(), 0.0)
+        e1.update(1000, 2.0, 2.0)
+        self.failUnlessEqual(ds.get_progress(), 0.25)
+        e2.update(1000, 2.0, 2.0)
+        self.failUnlessEqual(ds.get_progress(), 0.5)
+        e1.update(1000, 2.0, 2.0)
+        e1.finished(now+4)
+        # now there is only one outstanding read, and it is 50% done
+        self.failUnlessEqual(ds.get_progress(), 0.5)
+        e2.update(1000, 2.0, 2.0)
+        e2.finished(now+5)
+        self.failUnlessEqual(ds.get_progress(), 1.0)
+
+    def test_active(self):
+        now = 12345.1
+        ds = DownloadStatus("si-1", 123)
+        self.failUnlessEqual(ds.get_active(), False)
+        e1 = ds.add_read_event(0, 1000, now)
+        self.failUnlessEqual(ds.get_active(), True)
+        e2 = ds.add_read_event(1, 1000, now+1)
+        self.failUnlessEqual(ds.get_active(), True)
+        e1.finished(now+2)
+        self.failUnlessEqual(ds.get_active(), True)
+        e2.finished(now+3)
+        self.failUnlessEqual(ds.get_active(), False)
+
+def make_server(clientid):
+    tubid = hashutil.tagged_hash("clientid", clientid)[:20]
+    return NoNetworkServer(tubid, None)
+def make_servers(clientids):
+    servers = {}
+    for clientid in clientids:
+        servers[clientid] = make_server(clientid)
+    return servers
+
+class MyShare:
+    def __init__(self, shnum, server, rtt):
+        self._shnum = shnum
+        self._server = server
+        self._dyhb_rtt = rtt
+    def __repr__(self):
+        return "sh%d-on-%s" % (self._shnum, self._server.get_name())
+
+class MySegmentFetcher(SegmentFetcher):
+    def __init__(self, *args, **kwargs):
+        SegmentFetcher.__init__(self, *args, **kwargs)
+        self._test_start_shares = []
+    def _start_share(self, share, shnum):
+        self._test_start_shares.append(share)
+
+class FakeNode:
+    def __init__(self):
+        self.want_more = 0
+        self.failed = None
+        self.processed = None
+        self._si_prefix = "si_prefix"
+    def want_more_shares(self):
+        self.want_more += 1
+    def fetch_failed(self, fetcher, f):
+        self.failed = f
+    def process_blocks(self, segnum, blocks):
+        self.processed = (segnum, blocks)
+    def get_num_segments(self):
+        return 1, True
+
+class Selection(unittest.TestCase):
+    def test_no_shares(self):
+        node = FakeNode()
+        sf = SegmentFetcher(node, 0, 3, None)
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(node.failed, None)
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NoSharesError))
+        d.addCallback(_check2)
+        return d
+
+    def test_only_one_share(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(node.failed, None)
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NotEnoughSharesError))
+            sname = serverA.get_name()
+            self.failUnlessIn("complete= pending=sh0-on-%s overdue= unused="  % sname,
+                              str(node.failed))
+        d.addCallback(_check2)
+        return d
+
+    def test_good_diversity_early(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check2)
+        return d
+
+    def test_good_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_avoid_bad_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we could satisfy the read entirely from the first server, but we'd
+        # prefer not to. Instead, we expect to only pull one share from the
+        # first server
+        servers = make_servers(["peer-A", "peer-B", "peer-C"])
+        shares = [MyShare(0, servers["peer-A"], 0.0),
+                  MyShare(1, servers["peer-A"], 0.0),
+                  MyShare(2, servers["peer-A"], 0.0),
+                  MyShare(3, servers["peer-B"], 1.0),
+                  MyShare(4, servers["peer-C"], 2.0),
+                  ]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[3], shares[4]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      3: "block-3",
+                                                      4: "block-4"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_suffer_bad_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we satisfy the read entirely from the first server because we don't
+        # have any other choice.
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0),
+                  MyShare(1, serverA, 0.0),
+                  MyShare(2, serverA, 0.0),
+                  MyShare(3, serverA, 0.0),
+                  MyShare(4, serverA, 0.0),
+                  ]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(node.want_more, 3)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[2]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_suffer_bad_diversity_early(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we satisfy the read entirely from the first server because we don't
+        # have any other choice.
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0),
+                  MyShare(1, serverA, 0.0),
+                  MyShare(2, serverA, 0.0),
+                  MyShare(3, serverA, 0.0),
+                  MyShare(4, serverA, 0.0),
+                  ]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 2)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[2]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check2)
+        return d
+
+    def test_overdue(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, OVERDUE)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:6])
+            for sh in sf._test_start_shares[3:]:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {3: "block-3",
+                                                      4: "block-4",
+                                                      5: "block-5"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_overdue_fails(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        servers = make_servers(["peer-%d" % i for i in range(6)])
+        shares = [MyShare(i, servers["peer-%d" % i], i) for i in range(6)]
+        sf.add_shares(shares)
+        sf.no_more_shares()
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, OVERDUE)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:6])
+            for sh in sf._test_start_shares[3:]:
+                sf._block_request_activity(sh, sh._shnum, DEAD)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            # we're still waiting
+            self.failUnlessEqual(node.processed, None)
+            self.failUnlessEqual(node.failed, None)
+            # now complete one of the overdue ones, and kill one of the other
+            # ones, leaving one hanging. This should trigger a failure, since
+            # we cannot succeed.
+            live = sf._test_start_shares[0]
+            die = sf._test_start_shares[1]
+            sf._block_request_activity(live, live._shnum, COMPLETE, "block")
+            sf._block_request_activity(die, die._shnum, DEAD)
+            return flushEventualQueue()
+        d.addCallback(_check3)
+        def _check4(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NotEnoughSharesError))
+            sname = servers["peer-2"].get_name()
+            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-%s unused=" % sname,
+                              str(node.failed))
+        d.addCallback(_check4)
+        return d
+
+    def test_avoid_redundancy(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we could satisfy the read entirely from the first server, but we'd
+        # prefer not to. Instead, we expect to only pull one share from the
+        # first server
+        servers = make_servers(["peer-A", "peer-B", "peer-C", "peer-D",
+                                "peer-E"])
+        shares = [MyShare(0, servers["peer-A"],0.0),
+                  MyShare(1, servers["peer-B"],1.0),
+                  MyShare(0, servers["peer-C"],2.0), # this will be skipped
+                  MyShare(1, servers["peer-D"],3.0),
+                  MyShare(2, servers["peer-E"],4.0),
+                  ]
+        sf.add_shares(shares[:3])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1]])
+            # allow sh1 to retire
+            sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            # and then feed in the remaining shares
+            sf.add_shares(shares[3:])
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[4]])
+            sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
+            sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
+            return flushEventualQueue()
+        d.addCallback(_check3)
+        def _check4(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check4)
+        return d