]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_download.py
move DownloadStopped from download.common to interfaces
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_download.py
index 373aff58dbe3f46ad3c1e940b0d8ec6c43ce9192..30485b4a020da0ad8b0a23eeba5c2a9b296174fa 100644 (file)
@@ -8,14 +8,15 @@ from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from allmydata import uri
 from allmydata.storage.server import storage_index_to_dir
-from allmydata.util import base32, fileutil, spans, log
+from allmydata.util import base32, fileutil, spans, log, hashutil
 from allmydata.util.consumer import download_to_data, MemoryConsumer
 from allmydata.immutable import upload, layout
-from allmydata.test.no_network import GridTestMixin
+from allmydata.test.no_network import GridTestMixin, NoNetworkServer
 from allmydata.test.common import ShouldFailMixin
-from allmydata.interfaces import NotEnoughSharesError, NoSharesError
+from allmydata.interfaces import NotEnoughSharesError, NoSharesError, \
+     DownloadStopped
 from allmydata.immutable.downloader.common import BadSegmentNumberError, \
-     BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD
+     BadCiphertextHashError, COMPLETE, OVERDUE, DEAD
 from allmydata.immutable.downloader.status import DownloadStatus
 from allmydata.immutable.downloader.fetcher import SegmentFetcher
 from allmydata.codec import CRSDecoder
@@ -290,36 +291,28 @@ class DownloadTest(_Base, unittest.TestCase):
         def _got_data(data):
             self.failUnlessEqual(data, plaintext)
         d.addCallback(_got_data)
-        def _kill_some_servers():
-            # find the three shares that were used, and delete them. Then
-            # download again, forcing the downloader to fail over to other
-            # shares
-            servers = []
-            shares = sorted([s._shnum for s in self.n._cnode._node._shares])
-            self.failUnlessEqual(shares, [0,1,2,3])
+        def _kill_some_shares():
+            # find the shares that were used and delete them
+            shares = self.n._cnode._node._shares
+            shnums = sorted([s._shnum for s in shares])
+            self.failUnlessEqual(shnums, [0,1,2,3])
+
             # break the RIBucketReader references
-            for s in self.n._cnode._node._shares:
+            # (we don't break the RIStorageServer references, because that
+            # isn't needed to test the current downloader implementation)
+            for s in shares:
                 s._rref.broken = True
-                for servernum in immutable_shares:
-                    for shnum in immutable_shares[servernum]:
-                        if s._shnum == shnum:
-                            ss = self.g.servers_by_number[servernum]
-                            servers.append(ss)
-            # and, for good measure, break the RIStorageServer references
-            # too, just in case the downloader gets more aggressive in the
-            # future and tries to re-fetch the same share.
-            for ss in servers:
-                wrapper = self.g.servers_by_id[ss.my_nodeid]
-                wrapper.broken = True
         def _download_again(ign):
-            c = StallingConsumer(_kill_some_servers)
+            # download again, deleting some shares after the first write
+            # to the consumer
+            c = StallingConsumer(_kill_some_shares)
             return self.n.read(c)
         d.addCallback(_download_again)
         def _check_failover(c):
             self.failUnlessEqual("".join(c.chunks), plaintext)
-            shares = sorted([s._shnum for s in self.n._cnode._node._shares])
-            # we should now be using more shares than we were before
-            self.failIfEqual(shares, [0,1,2,3])
+            shares = self.n._cnode._node._shares
+            shnums = sorted([s._shnum for s in shares])
+            self.failIfEqual(shnums, [0,1,2,3])
         d.addCallback(_check_failover)
         return d
 
@@ -604,7 +597,8 @@ class DownloadTest(_Base, unittest.TestCase):
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1175,7 +1169,8 @@ class DownloadV2(_Base, unittest.TestCase):
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1194,7 +1189,8 @@ class DownloadV2(_Base, unittest.TestCase):
         self.set_up_grid()
         self.c0 = self.g.clients[0]
 
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1222,14 +1218,16 @@ class Status(unittest.TestCase):
         now = 12345.1
         ds = DownloadStatus("si-1", 123)
         self.failUnlessEqual(ds.get_status(), "idle")
-        ds.add_segment_request(0, now)
+        ev0 = ds.add_segment_request(0, now)
         self.failUnlessEqual(ds.get_status(), "fetching segment 0")
-        ds.add_segment_delivery(0, now+1, 0, 1000, 2.0)
+        ev0.activate(now+0.5)
+        ev0.deliver(now+1, 0, 1000, 2.0)
         self.failUnlessEqual(ds.get_status(), "idle")
-        ds.add_segment_request(2, now+2)
-        ds.add_segment_request(1, now+2)
+        ev2 = ds.add_segment_request(2, now+2)
+        del ev2 # hush pyflakes
+        ev1 = ds.add_segment_request(1, now+2)
         self.failUnlessEqual(ds.get_status(), "fetching segments 1,2")
-        ds.add_segment_error(1, now+3)
+        ev1.error(now+3)
         self.failUnlessEqual(ds.get_status(),
                              "fetching segment 2; errors on segment 1")
 
@@ -1272,14 +1270,22 @@ class Status(unittest.TestCase):
         e2.finished(now+3)
         self.failUnlessEqual(ds.get_active(), False)
 
+def make_server(clientid):
+    tubid = hashutil.tagged_hash("clientid", clientid)[:20]
+    return NoNetworkServer(tubid, None)
+def make_servers(clientids):
+    servers = {}
+    for clientid in clientids:
+        servers[clientid] = make_server(clientid)
+    return servers
+
 class MyShare:
-    def __init__(self, shnum, peerid, rtt):
+    def __init__(self, shnum, server, rtt):
         self._shnum = shnum
-        self._peerid = peerid
-        self._peerid_s = peerid
+        self._server = server
         self._dyhb_rtt = rtt
     def __repr__(self):
-        return "sh%d-on-%s" % (self._shnum, self._peerid)
+        return "sh%d-on-%s" % (self._shnum, self._server.get_name())
 
 class MySegmentFetcher(SegmentFetcher):
     def __init__(self, *args, **kwargs):
@@ -1324,7 +1330,8 @@ class Selection(unittest.TestCase):
     def test_only_one_share(self):
         node = FakeNode()
         sf = MySegmentFetcher(node, 0, 3, None)
-        shares = [MyShare(0, "peer-A", 0.0)]
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0)]
         sf.add_shares(shares)
         d = flushEventualQueue()
         def _check1(ign):
@@ -1336,7 +1343,8 @@ class Selection(unittest.TestCase):
         def _check2(ign):
             self.failUnless(node.failed)
             self.failUnless(node.failed.check(NotEnoughSharesError))
-            self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
+            sname = serverA.get_name()
+            self.failUnlessIn("complete= pending=sh0-on-%s overdue= unused="  % sname,
                               str(node.failed))
         d.addCallback(_check2)
         return d
@@ -1344,7 +1352,7 @@ class Selection(unittest.TestCase):
     def test_good_diversity_early(self):
         node = FakeNode()
         sf = MySegmentFetcher(node, 0, 3, None)
-        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
         sf.add_shares(shares)
         d = flushEventualQueue()
         def _check1(ign):
@@ -1366,7 +1374,7 @@ class Selection(unittest.TestCase):
     def test_good_diversity_late(self):
         node = FakeNode()
         sf = MySegmentFetcher(node, 0, 3, None)
-        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
         sf.add_shares([])
         d = flushEventualQueue()
         def _check1(ign):
@@ -1395,11 +1403,12 @@ class Selection(unittest.TestCase):
         # we could satisfy the read entirely from the first server, but we'd
         # prefer not to. Instead, we expect to only pull one share from the
         # first server
-        shares = [MyShare(0, "peer-A", 0.0),
-                  MyShare(1, "peer-A", 0.0),
-                  MyShare(2, "peer-A", 0.0),
-                  MyShare(3, "peer-B", 1.0),
-                  MyShare(4, "peer-C", 2.0),
+        servers = make_servers(["peer-A", "peer-B", "peer-C"])
+        shares = [MyShare(0, servers["peer-A"], 0.0),
+                  MyShare(1, servers["peer-A"], 0.0),
+                  MyShare(2, servers["peer-A"], 0.0),
+                  MyShare(3, servers["peer-B"], 1.0),
+                  MyShare(4, servers["peer-C"], 2.0),
                   ]
         sf.add_shares([])
         d = flushEventualQueue()
@@ -1429,11 +1438,12 @@ class Selection(unittest.TestCase):
         sf = MySegmentFetcher(node, 0, 3, None)
         # we satisfy the read entirely from the first server because we don't
         # have any other choice.
-        shares = [MyShare(0, "peer-A", 0.0),
-                  MyShare(1, "peer-A", 0.0),
-                  MyShare(2, "peer-A", 0.0),
-                  MyShare(3, "peer-A", 0.0),
-                  MyShare(4, "peer-A", 0.0),
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0),
+                  MyShare(1, serverA, 0.0),
+                  MyShare(2, serverA, 0.0),
+                  MyShare(3, serverA, 0.0),
+                  MyShare(4, serverA, 0.0),
                   ]
         sf.add_shares([])
         d = flushEventualQueue()
@@ -1464,11 +1474,12 @@ class Selection(unittest.TestCase):
         sf = MySegmentFetcher(node, 0, 3, None)
         # we satisfy the read entirely from the first server because we don't
         # have any other choice.
-        shares = [MyShare(0, "peer-A", 0.0),
-                  MyShare(1, "peer-A", 0.0),
-                  MyShare(2, "peer-A", 0.0),
-                  MyShare(3, "peer-A", 0.0),
-                  MyShare(4, "peer-A", 0.0),
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0),
+                  MyShare(1, serverA, 0.0),
+                  MyShare(2, serverA, 0.0),
+                  MyShare(3, serverA, 0.0),
+                  MyShare(4, serverA, 0.0),
                   ]
         sf.add_shares(shares)
         d = flushEventualQueue()
@@ -1492,7 +1503,7 @@ class Selection(unittest.TestCase):
     def test_overdue(self):
         node = FakeNode()
         sf = MySegmentFetcher(node, 0, 3, None)
-        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
         sf.add_shares(shares)
         d = flushEventualQueue()
         def _check1(ign):
@@ -1520,7 +1531,8 @@ class Selection(unittest.TestCase):
     def test_overdue_fails(self):
         node = FakeNode()
         sf = MySegmentFetcher(node, 0, 3, None)
-        shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
+        servers = make_servers(["peer-%d" % i for i in range(6)])
+        shares = [MyShare(i, servers["peer-%d" % i], i) for i in range(6)]
         sf.add_shares(shares)
         sf.no_more_shares()
         d = flushEventualQueue()
@@ -1553,7 +1565,8 @@ class Selection(unittest.TestCase):
         def _check4(ign):
             self.failUnless(node.failed)
             self.failUnless(node.failed.check(NotEnoughSharesError))
-            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
+            sname = servers["peer-2"].get_name()
+            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-%s unused=" % sname,
                               str(node.failed))
         d.addCallback(_check4)
         return d
@@ -1564,11 +1577,13 @@ class Selection(unittest.TestCase):
         # we could satisfy the read entirely from the first server, but we'd
         # prefer not to. Instead, we expect to only pull one share from the
         # first server
-        shares = [MyShare(0, "peer-A", 0.0),
-                  MyShare(1, "peer-B", 1.0),
-                  MyShare(0, "peer-C", 2.0), # this will be skipped
-                  MyShare(1, "peer-D", 3.0),
-                  MyShare(2, "peer-E", 4.0),
+        servers = make_servers(["peer-A", "peer-B", "peer-C", "peer-D",
+                                "peer-E"])
+        shares = [MyShare(0, servers["peer-A"],0.0),
+                  MyShare(1, servers["peer-B"],1.0),
+                  MyShare(0, servers["peer-C"],2.0), # this will be skipped
+                  MyShare(1, servers["peer-D"],3.0),
+                  MyShare(2, servers["peer-E"],4.0),
                   ]
         sf.add_shares(shares[:3])
         d = flushEventualQueue()