]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_download.py
move DownloadStopped from download.common to interfaces
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_download.py
index 71a556bb6b5540005fe3de71caee4b12e52f92a8..30485b4a020da0ad8b0a23eeba5c2a9b296174fa 100644 (file)
@@ -8,15 +8,17 @@ from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from allmydata import uri
 from allmydata.storage.server import storage_index_to_dir
-from allmydata.util import base32, fileutil, spans, log
+from allmydata.util import base32, fileutil, spans, log, hashutil
 from allmydata.util.consumer import download_to_data, MemoryConsumer
 from allmydata.immutable import upload, layout
-from allmydata.test.no_network import GridTestMixin
+from allmydata.test.no_network import GridTestMixin, NoNetworkServer
 from allmydata.test.common import ShouldFailMixin
-from allmydata.interfaces import NotEnoughSharesError, NoSharesError
+from allmydata.interfaces import NotEnoughSharesError, NoSharesError, \
+     DownloadStopped
 from allmydata.immutable.downloader.common import BadSegmentNumberError, \
-     BadCiphertextHashError, DownloadStopped
+     BadCiphertextHashError, COMPLETE, OVERDUE, DEAD
 from allmydata.immutable.downloader.status import DownloadStatus
+from allmydata.immutable.downloader.fetcher import SegmentFetcher
 from allmydata.codec import CRSDecoder
 from foolscap.eventual import fireEventually, flushEventualQueue
 
@@ -289,36 +291,28 @@ class DownloadTest(_Base, unittest.TestCase):
         def _got_data(data):
             self.failUnlessEqual(data, plaintext)
         d.addCallback(_got_data)
-        def _kill_some_servers():
-            # find the three shares that were used, and delete them. Then
-            # download again, forcing the downloader to fail over to other
-            # shares
-            servers = []
-            shares = sorted([s._shnum for s in self.n._cnode._node._shares])
-            self.failUnlessEqual(shares, [0,1,2])
+        def _kill_some_shares():
+            # find the shares that were used and delete them
+            shares = self.n._cnode._node._shares
+            shnums = sorted([s._shnum for s in shares])
+            self.failUnlessEqual(shnums, [0,1,2,3])
+
             # break the RIBucketReader references
-            for s in self.n._cnode._node._shares:
+            # (we don't break the RIStorageServer references, because that
+            # isn't needed to test the current downloader implementation)
+            for s in shares:
                 s._rref.broken = True
-                for servernum in immutable_shares:
-                    for shnum in immutable_shares[servernum]:
-                        if s._shnum == shnum:
-                            ss = self.g.servers_by_number[servernum]
-                            servers.append(ss)
-            # and, for good measure, break the RIStorageServer references
-            # too, just in case the downloader gets more aggressive in the
-            # future and tries to re-fetch the same share.
-            for ss in servers:
-                wrapper = self.g.servers_by_id[ss.my_nodeid]
-                wrapper.broken = True
         def _download_again(ign):
-            c = StallingConsumer(_kill_some_servers)
+            # download again, deleting some shares after the first write
+            # to the consumer
+            c = StallingConsumer(_kill_some_shares)
             return self.n.read(c)
         d.addCallback(_download_again)
         def _check_failover(c):
             self.failUnlessEqual("".join(c.chunks), plaintext)
-            shares = sorted([s._shnum for s in self.n._cnode._node._shares])
-            # we should now be using more shares than we were before
-            self.failIfEqual(shares, [0,1,2])
+            shares = self.n._cnode._node._shares
+            shnums = sorted([s._shnum for s in shares])
+            self.failIfEqual(shnums, [0,1,2,3])
         d.addCallback(_check_failover)
         return d
 
@@ -539,7 +533,7 @@ class DownloadTest(_Base, unittest.TestCase):
             def _con1_should_not_succeed(res):
                 self.fail("the first read should not have succeeded")
             def _con1_failed(f):
-                self.failUnless(f.check(NotEnoughSharesError))
+                self.failUnless(f.check(NoSharesError))
                 con2.producer.stopProducing()
                 return d2
             d.addCallbacks(_con1_should_not_succeed, _con1_failed)
@@ -583,7 +577,7 @@ class DownloadTest(_Base, unittest.TestCase):
             def _con1_should_not_succeed(res):
                 self.fail("the first read should not have succeeded")
             def _con1_failed(f):
-                self.failUnless(f.check(NotEnoughSharesError))
+                self.failUnless(f.check(NoSharesError))
                 # we *don't* cancel the second one here: this exercises a
                 # lost-progress bug from #1154. We just wait for it to
                 # succeed.
@@ -603,7 +597,8 @@ class DownloadTest(_Base, unittest.TestCase):
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1121,7 +1116,7 @@ class Corruption(_Base, unittest.TestCase):
                 # All these tests result in a failed download.
                 d.addCallback(self._corrupt_flip_all, imm_uri, i)
                 d.addCallback(lambda ign:
-                              self.shouldFail(NotEnoughSharesError, which,
+                              self.shouldFail(NoSharesError, which,
                                               substring,
                                               _download, imm_uri))
                 d.addCallback(lambda ign: self.restore_all_shares(self.shares))
@@ -1174,7 +1169,8 @@ class DownloadV2(_Base, unittest.TestCase):
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1193,7 +1189,8 @@ class DownloadV2(_Base, unittest.TestCase):
         self.set_up_grid()
         self.c0 = self.g.clients[0]
 
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1221,14 +1218,16 @@ class Status(unittest.TestCase):
         now = 12345.1
         ds = DownloadStatus("si-1", 123)
         self.failUnlessEqual(ds.get_status(), "idle")
-        ds.add_segment_request(0, now)
+        ev0 = ds.add_segment_request(0, now)
         self.failUnlessEqual(ds.get_status(), "fetching segment 0")
-        ds.add_segment_delivery(0, now+1, 0, 1000, 2.0)
+        ev0.activate(now+0.5)
+        ev0.deliver(now+1, 0, 1000, 2.0)
         self.failUnlessEqual(ds.get_status(), "idle")
-        ds.add_segment_request(2, now+2)
-        ds.add_segment_request(1, now+2)
+        ev2 = ds.add_segment_request(2, now+2)
+        del ev2 # hush pyflakes
+        ev1 = ds.add_segment_request(1, now+2)
         self.failUnlessEqual(ds.get_status(), "fetching segments 1,2")
-        ds.add_segment_error(1, now+3)
+        ev1.error(now+3)
         self.failUnlessEqual(ds.get_status(),
                              "fetching segment 2; errors on segment 1")
 
@@ -1257,3 +1256,362 @@ class Status(unittest.TestCase):
         e2.update(1000, 2.0, 2.0)
         e2.finished(now+5)
         self.failUnlessEqual(ds.get_progress(), 1.0)
+
+    def test_active(self):
+        now = 12345.1
+        ds = DownloadStatus("si-1", 123)
+        self.failUnlessEqual(ds.get_active(), False)
+        e1 = ds.add_read_event(0, 1000, now)
+        self.failUnlessEqual(ds.get_active(), True)
+        e2 = ds.add_read_event(1, 1000, now+1)
+        self.failUnlessEqual(ds.get_active(), True)
+        e1.finished(now+2)
+        self.failUnlessEqual(ds.get_active(), True)
+        e2.finished(now+3)
+        self.failUnlessEqual(ds.get_active(), False)
+
+def make_server(clientid):
+    tubid = hashutil.tagged_hash("clientid", clientid)[:20]
+    return NoNetworkServer(tubid, None)
+def make_servers(clientids):
+    servers = {}
+    for clientid in clientids:
+        servers[clientid] = make_server(clientid)
+    return servers
+
+class MyShare:
+    def __init__(self, shnum, server, rtt):
+        self._shnum = shnum
+        self._server = server
+        self._dyhb_rtt = rtt
+    def __repr__(self):
+        return "sh%d-on-%s" % (self._shnum, self._server.get_name())
+
+class MySegmentFetcher(SegmentFetcher):
+    def __init__(self, *args, **kwargs):
+        SegmentFetcher.__init__(self, *args, **kwargs)
+        self._test_start_shares = []
+    def _start_share(self, share, shnum):
+        self._test_start_shares.append(share)
+
+class FakeNode:
+    def __init__(self):
+        self.want_more = 0
+        self.failed = None
+        self.processed = None
+        self._si_prefix = "si_prefix"
+    def want_more_shares(self):
+        self.want_more += 1
+    def fetch_failed(self, fetcher, f):
+        self.failed = f
+    def process_blocks(self, segnum, blocks):
+        self.processed = (segnum, blocks)
+    def get_num_segments(self):
+        return 1, True
+
+class Selection(unittest.TestCase):
+    def test_no_shares(self):
+        node = FakeNode()
+        sf = SegmentFetcher(node, 0, 3, None)
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(node.failed, None)
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NoSharesError))
+        d.addCallback(_check2)
+        return d
+
+    def test_only_one_share(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(node.failed, None)
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NotEnoughSharesError))
+            sname = serverA.get_name()
+            self.failUnlessIn("complete= pending=sh0-on-%s overdue= unused="  % sname,
+                              str(node.failed))
+        d.addCallback(_check2)
+        return d
+
+    def test_good_diversity_early(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check2)
+        return d
+
+    def test_good_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_avoid_bad_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we could satisfy the read entirely from the first server, but we'd
+        # prefer not to. Instead, we expect to only pull one share from the
+        # first server
+        servers = make_servers(["peer-A", "peer-B", "peer-C"])
+        shares = [MyShare(0, servers["peer-A"], 0.0),
+                  MyShare(1, servers["peer-A"], 0.0),
+                  MyShare(2, servers["peer-A"], 0.0),
+                  MyShare(3, servers["peer-B"], 1.0),
+                  MyShare(4, servers["peer-C"], 2.0),
+                  ]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[3], shares[4]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      3: "block-3",
+                                                      4: "block-4"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_suffer_bad_diversity_late(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we satisfy the read entirely from the first server because we don't
+        # have any other choice.
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0),
+                  MyShare(1, serverA, 0.0),
+                  MyShare(2, serverA, 0.0),
+                  MyShare(3, serverA, 0.0),
+                  MyShare(4, serverA, 0.0),
+                  ]
+        sf.add_shares([])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            sf.add_shares(shares)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(node.want_more, 3)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[2]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_suffer_bad_diversity_early(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we satisfy the read entirely from the first server because we don't
+        # have any other choice.
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0),
+                  MyShare(1, serverA, 0.0),
+                  MyShare(2, serverA, 0.0),
+                  MyShare(3, serverA, 0.0),
+                  MyShare(4, serverA, 0.0),
+                  ]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 2)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[2]])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check2)
+        return d
+
+    def test_overdue(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
+        sf.add_shares(shares)
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, OVERDUE)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:6])
+            for sh in sf._test_start_shares[3:]:
+                sf._block_request_activity(sh, sh._shnum, COMPLETE,
+                                           "block-%d" % sh._shnum)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {3: "block-3",
+                                                      4: "block-4",
+                                                      5: "block-5"}) )
+        d.addCallback(_check3)
+        return d
+
+    def test_overdue_fails(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        servers = make_servers(["peer-%d" % i for i in range(6)])
+        shares = [MyShare(i, servers["peer-%d" % i], i) for i in range(6)]
+        sf.add_shares(shares)
+        sf.no_more_shares()
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 0)
+            self.failUnlessEqual(sf._test_start_shares, shares[:3])
+            for sh in sf._test_start_shares:
+                sf._block_request_activity(sh, sh._shnum, OVERDUE)
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            self.failUnlessEqual(sf._test_start_shares, shares[:6])
+            for sh in sf._test_start_shares[3:]:
+                sf._block_request_activity(sh, sh._shnum, DEAD)
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            # we're still waiting
+            self.failUnlessEqual(node.processed, None)
+            self.failUnlessEqual(node.failed, None)
+            # now complete one of the overdue ones, and kill one of the other
+            # ones, leaving one hanging. This should trigger a failure, since
+            # we cannot succeed.
+            live = sf._test_start_shares[0]
+            die = sf._test_start_shares[1]
+            sf._block_request_activity(live, live._shnum, COMPLETE, "block")
+            sf._block_request_activity(die, die._shnum, DEAD)
+            return flushEventualQueue()
+        d.addCallback(_check3)
+        def _check4(ign):
+            self.failUnless(node.failed)
+            self.failUnless(node.failed.check(NotEnoughSharesError))
+            sname = servers["peer-2"].get_name()
+            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-%s unused=" % sname,
+                              str(node.failed))
+        d.addCallback(_check4)
+        return d
+
+    def test_avoid_redundancy(self):
+        node = FakeNode()
+        sf = MySegmentFetcher(node, 0, 3, None)
+        # we could satisfy the read entirely from the first server, but we'd
+        # prefer not to. Instead, we expect to only pull one share from the
+        # first server
+        servers = make_servers(["peer-A", "peer-B", "peer-C", "peer-D",
+                                "peer-E"])
+        shares = [MyShare(0, servers["peer-A"],0.0),
+                  MyShare(1, servers["peer-B"],1.0),
+                  MyShare(0, servers["peer-C"],2.0), # this will be skipped
+                  MyShare(1, servers["peer-D"],3.0),
+                  MyShare(2, servers["peer-E"],4.0),
+                  ]
+        sf.add_shares(shares[:3])
+        d = flushEventualQueue()
+        def _check1(ign):
+            self.failUnlessEqual(node.want_more, 1)
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1]])
+            # allow sh1 to retire
+            sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
+            return flushEventualQueue()
+        d.addCallback(_check1)
+        def _check2(ign):
+            # and then feed in the remaining shares
+            sf.add_shares(shares[3:])
+            sf.no_more_shares()
+            return flushEventualQueue()
+        d.addCallback(_check2)
+        def _check3(ign):
+            self.failUnlessEqual(sf._test_start_shares,
+                                 [shares[0], shares[1], shares[4]])
+            sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
+            sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
+            return flushEventualQueue()
+        d.addCallback(_check3)
+        def _check4(ign):
+            self.failIfEqual(node.processed, None)
+            self.failUnlessEqual(node.processed, (0, {0: "block-0",
+                                                      1: "block-1",
+                                                      2: "block-2"}) )
+        d.addCallback(_check4)
+        return d