]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_download.py
SegmentFetcher: use new diversity-seeking share-selection algorithm, and
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_download.py
index 71a556bb6b5540005fe3de71caee4b12e52f92a8..40f0d62c07f08597e8f13221f5985d10a0f923a4 100644 (file)
@@ -15,8 +15,9 @@ from allmydata.test.no_network import GridTestMixin
 from allmydata.test.common import ShouldFailMixin
 from allmydata.interfaces import NotEnoughSharesError, NoSharesError
 from allmydata.immutable.downloader.common import BadSegmentNumberError, \
-     BadCiphertextHashError, DownloadStopped
+     BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD
 from allmydata.immutable.downloader.status import DownloadStatus
+from allmydata.immutable.downloader.fetcher import SegmentFetcher
 from allmydata.codec import CRSDecoder
 from foolscap.eventual import fireEventually, flushEventualQueue
 
@@ -295,7 +296,7 @@ class DownloadTest(_Base, unittest.TestCase):
             # shares
             servers = []
             shares = sorted([s._shnum for s in self.n._cnode._node._shares])
-            self.failUnlessEqual(shares, [0,1,2])
+            self.failUnlessEqual(shares, [0,1,2,3])
             # break the RIBucketReader references
             for s in self.n._cnode._node._shares:
                 s._rref.broken = True
@@ -318,7 +319,7 @@ class DownloadTest(_Base, unittest.TestCase):
             self.failUnlessEqual("".join(c.chunks), plaintext)
             shares = sorted([s._shnum for s in self.n._cnode._node._shares])
             # we should now be using more shares than we were before
-            self.failIfEqual(shares, [0,1,2])
+            self.failIfEqual(shares, [0,1,2,3])
         d.addCallback(_check_failover)
         return d
 
@@ -539,7 +540,7 @@ class DownloadTest(_Base, unittest.TestCase):
             def _con1_should_not_succeed(res):
                 self.fail("the first read should not have succeeded")
             def _con1_failed(f):
-                self.failUnless(f.check(NotEnoughSharesError))
+                self.failUnless(f.check(NoSharesError))
                 con2.producer.stopProducing()
                 return d2
             d.addCallbacks(_con1_should_not_succeed, _con1_failed)
@@ -583,7 +584,7 @@ class DownloadTest(_Base, unittest.TestCase):
             def _con1_should_not_succeed(res):
                 self.fail("the first read should not have succeeded")
             def _con1_failed(f):
-                self.failUnless(f.check(NotEnoughSharesError))
+                self.failUnless(f.check(NoSharesError))
                 # we *don't* cancel the second one here: this exercises a
                 # lost-progress bug from #1154. We just wait for it to
                 # succeed.
@@ -1121,7 +1122,7 @@ class Corruption(_Base, unittest.TestCase):
                 # All these tests result in a failed download.
                 d.addCallback(self._corrupt_flip_all, imm_uri, i)
                 d.addCallback(lambda ign:
-                              self.shouldFail(NotEnoughSharesError, which,
+                              self.shouldFail(NoSharesError, which,
                                               substring,
                                               _download, imm_uri))
                 d.addCallback(lambda ign: self.restore_all_shares(self.shares))
@@ -1257,3 +1258,332 @@ class Status(unittest.TestCase):
         e2.update(1000, 2.0, 2.0)
         e2.finished(now+5)
         self.failUnlessEqual(ds.get_progress(), 1.0)
+
+class MyShare:
+    def __init__(self, shnum, peerid, rtt):
+        self._shnum = shnum
+        self._peerid = peerid
+        self._peerid_s = peerid
+        self._dyhb_rtt = rtt
+    def __repr__(self):
+        return "sh%d-on-%s" % (self._shnum, self._peerid)
+
+class MySegmentFetcher(SegmentFetcher):
+    def __init__(self, *args, **kwargs):
+        SegmentFetcher.__init__(self, *args, **kwargs)
+        self._test_start_shares = []
+    def _start_share(self, share, shnum):
+        self._test_start_shares.append(share)
+
+class FakeNode:
+    def __init__(self):
+        self.want_more = 0
+        self.failed = None
+        self.processed = None
+        self._si_prefix = "si_prefix"
+    def want_more_shares(self):
+        self.want_more += 1
+    def fetch_failed(self, fetcher, f):
+        self.failed = f
+    def process_blocks(self, segnum, blocks):
+        self.processed = (segnum, blocks)
+    def get_num_segments(self):
+        return 1, True
+
+class Selection(unittest.TestCase):
+    def test_no_shares(self):
+        node = FakeNode()
+        sf = SegmentFetcher(node, 0, 3, None)
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(node.failed, None)
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NoSharesError))
+        d.addCallback(_check2)
+        return d
+
+    def test_only_one_share(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(0, "peer-A", 0.0)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(node.failed, None)
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NotEnoughSharesError))
+            self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
+                              str(node.failed))
+        d.addCallback(_check2)
+        return d
+
+    def test_good_diversity_early(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check2)
+        return d
+
+    def test_good_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_avoid_bad_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we could satisfy the read entirely from the first server, but we'd
+        # prefer not to. Instead, we expect to only pull one share from the
+        # first server
+        shares = [MyShare(0, "peer-A", 0.0),
+                  MyShare(1, "peer-A", 0.0),
+                  MyShare(2, "peer-A", 0.0),
+                  MyShare(3, "peer-B", 1.0),
+                  MyShare(4, "peer-C", 2.0),
+                  ]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[3], shares[4]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      3: "block-3",
+                                                      4: "block-4"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_suffer_bad_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we satisfy the read entirely from the first server because we don't
+        # have any other choice.
+        shares = [MyShare(0, "peer-A", 0.0),
+                  MyShare(1, "peer-A", 0.0),
+                  MyShare(2, "peer-A", 0.0),
+                  MyShare(3, "peer-A", 0.0),
+                  MyShare(4, "peer-A", 0.0),
+                  ]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(node.want_more, 3)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[2]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_suffer_bad_diversity_early(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we satisfy the read entirely from the first server because we don't
+        # have any other choice.
+        shares = [MyShare(0, "peer-A", 0.0),
+                  MyShare(1, "peer-A", 0.0),
+                  MyShare(2, "peer-A", 0.0),
+                  MyShare(3, "peer-A", 0.0),
+                  MyShare(4, "peer-A", 0.0),
+                  ]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 2)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[2]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check2)
+        return d
+
+    def test_overdue(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, OVERDUE)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:6])
+            for sh in sf._test_start_shares[3:]:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {3: "block-3",
+                                                      4: "block-4",
+                                                      5: "block-5"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_overdue_fails(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
+        sf.add_shares(shares)
+        sf.no_more_shares()
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, OVERDUE)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:6])
+            for sh in sf._test_start_shares[3:]:
+                sf._block_request_activity(sh, sh._shnum, DEAD)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            # we're still waiting
+            self.failUnlessEqual(node.processed, None)
+            self.failUnlessEqual(node.failed, None)
+            # now complete one of the overdue ones, and kill one of the other
+            # ones, leaving one hanging. This should trigger a failure, since
+            # we cannot succeed.
+            live = sf._test_start_shares[0]
+            die = sf._test_start_shares[1]
+            sf._block_request_activity(live, live._shnum, COMPLETE, "block")
+            sf._block_request_activity(die, die._shnum, DEAD)
+            return flushEventualQueue()
+        d.addCallback(_check3)
+        def _check4(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NotEnoughSharesError))
+            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
+                              str(node.failed))
+        d.addCallback(_check4)
+        return d
+
+    def test_avoid_redundancy(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we could satisfy the read entirely from the first server, but we'd
+        # prefer not to. Instead, we expect to only pull one share from the
+        # first server
+        shares = [MyShare(0, "peer-A", 0.0),
+                  MyShare(1, "peer-B", 1.0),
+                  MyShare(0, "peer-C", 2.0), # this will be skipped
+                  MyShare(1, "peer-D", 3.0),
+                  MyShare(2, "peer-E", 4.0),
+                  ]
+        sf.add_shares(shares[:3])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1]])
+            # allow sh1 to retire
+            sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            # and then feed in the remaining shares
+            sf.add_shares(shares[3:])
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[4]])
+            sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
+            sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
+            return flushEventualQueue()
+        d.addCallback(_check3)
+        def _check4(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check4)
+        return d