immutable/downloader/share.py: reduce get_serverid(), one left, update ext deps
authorBrian Warner <warner@lothar.com>
Sun, 27 Feb 2011 02:11:50 +0000 (19:11 -0700)
committerBrian Warner <warner@lothar.com>
Sun, 27 Feb 2011 02:11:50 +0000 (19:11 -0700)
test_download.py: create+check MyShare instances better, make sure they share
Server objects, now that finder.py cares

src/allmydata/immutable/downloader/fetcher.py
src/allmydata/immutable/downloader/finder.py
src/allmydata/immutable/downloader/share.py
src/allmydata/test/test_cli.py
src/allmydata/test/test_download.py

index e78d37e76b74721a8235493d71130809a75e3452..84552279e6afba682d4b10e19398c41872ab40f3 100644 (file)
@@ -189,7 +189,7 @@ class SegmentFetcher:
         sent_something = False
         want_more_diversity = False
         for sh in self._shares: # find one good share to fetch
-            shnum = sh._shnum ; serverid = sh._peerid
+            shnum = sh._shnum ; serverid = sh._server.get_serverid()
             if shnum in self._blocks:
                 continue # don't request data we already have
             if shnum in self._active_share_map:
@@ -229,9 +229,8 @@ class SegmentFetcher:
         # called by Shares, in response to our s.send_request() calls.
         if not self._running:
             return
-        log.msg("SegmentFetcher(%s)._block_request_activity:"
-                " Share(sh%d-on-%s) -> %s" %
-                (self._node._si_prefix, shnum, share._peerid_s, state),
+        log.msg("SegmentFetcher(%s)._block_request_activity: %s -> %s" %
+                (self._node._si_prefix, repr(share), state),
                 level=log.NOISY, parent=self._lp, umid="vilNWA")
         # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share
         # from all our tracking lists.
index c09cc3c866899310ca6dbbf96963ec977afb994f..c2f9c1a66544e508ddc4cd246bafa0b8666b52c1 100644 (file)
@@ -204,8 +204,8 @@ class ShareFinder:
             #  2: break _get_satisfaction into Deferred-attached pieces.
             #     Yuck.
             self._commonshares[shnum] = cs
-        s = Share(bucket, server.get_version(), self.verifycap, cs, self.node,
-                  self._download_status, server.get_serverid(), shnum, dyhb_rtt,
+        s = Share(bucket, server, self.verifycap, cs, self.node,
+                  self._download_status, shnum, dyhb_rtt,
                   self._node_logparent)
         return s
 
index 78cce8ed0906d0075a773c9a173284d69f05218a..32be47a4425dc1bcc9b5f42ef50d062ee7191077 100644 (file)
@@ -32,10 +32,10 @@ class Share:
     # this is a specific implementation of IShare for tahoe's native storage
     # servers. A different backend would use a different class.
 
-    def __init__(self, rref, server_version, verifycap, commonshare, node,
-                 download_status, peerid, shnum, dyhb_rtt, logparent):
+    def __init__(self, rref, server, verifycap, commonshare, node,
+                 download_status, shnum, dyhb_rtt, logparent):
         self._rref = rref
-        self._server_version = server_version
+        self._server = server
         self._node = node # holds share_hash_tree and UEB
         self.actual_segment_size = node.segment_size # might still be None
         # XXX change node.guessed_segment_size to
@@ -46,8 +46,6 @@ class Share:
         self._UEB_length = None
         self._commonshare = commonshare # holds block_hash_tree
         self._download_status = download_status
-        self._peerid = peerid
-        self._peerid_s = base32.b2a(peerid)[:5]
         self._storage_index = verifycap.storage_index
         self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
         self._shnum = shnum
@@ -82,7 +80,8 @@ class Share:
         # download can re-fetch it.
 
         self._requested_blocks = [] # (segnum, set(observer2..))
-        ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"]
+        v = server.get_version()
+        ver = v["http://allmydata.org/tahoe/protocols/storage/v1"]
         self._overrun_ok = ver["tolerates-immutable-read-overrun"]
         # If _overrun_ok and we guess the offsets correctly, we can get
         # everything in one RTT. If _overrun_ok and we guess wrong, we might
@@ -94,7 +93,7 @@ class Share:
         self.had_corruption = False # for unit tests
 
     def __repr__(self):
-        return "Share(sh%d-on-%s)" % (self._shnum, self._peerid_s)
+        return "Share(sh%d-on-%s)" % (self._shnum, self._server.name())
 
     def is_alive(self):
         # XXX: reconsider. If the share sees a single error, should it remain
@@ -727,7 +726,8 @@ class Share:
                          share=repr(self),
                          start=start, length=length,
                          level=log.NOISY, parent=self._lp, umid="sgVAyA")
-            req_ev = ds.add_request_sent(self._peerid, self._shnum,
+            req_ev = ds.add_request_sent(self._server.get_serverid(),
+                                         self._shnum,
                                          start, length, now())
             d = self._send_request(start, length)
             d.addCallback(self._got_data, start, length, req_ev, lp)
@@ -789,7 +789,7 @@ class Share:
         log.msg(format="error requesting %(start)d+%(length)d"
                 " from %(server)s for si %(si)s",
                 start=start, length=length,
-                server=self._peerid_s, si=self._si_prefix,
+                server=self._server.name(), si=self._si_prefix,
                 failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw")
         # retire our observers, assuming we won't be able to make any
         # further progress
index 46527f72fcd6957a6bb0b866a68e720964225310..254d702c5c9d2cf5d2183f92014c1ed1a622365d 100644 (file)
@@ -2412,7 +2412,7 @@ class Errors(GridTestMixin, CLITestMixin, unittest.TestCase):
         # enough shares. The one remaining share might be in either the
         # COMPLETE or the PENDING state.
         in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3"
-        in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3"
+        in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7vqgd) overdue= unused= need 3"
 
         d.addCallback(lambda ign: self.do_cli("get", self.uri_1share))
         def _check1((rc, out, err)):
index ab2e98bc0665345b43072e74011323cb6a2114e5..91ff8ada52955043c10cea288b3143ddf6e8ce7a 100644 (file)
@@ -8,10 +8,10 @@ from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from allmydata import uri
 from allmydata.storage.server import storage_index_to_dir
-from allmydata.util import base32, fileutil, spans, log
+from allmydata.util import base32, fileutil, spans, log, hashutil
 from allmydata.util.consumer import download_to_data, MemoryConsumer
 from allmydata.immutable import upload, layout
-from allmydata.test.no_network import GridTestMixin
+from allmydata.test.no_network import GridTestMixin, NoNetworkServer
 from allmydata.test.common import ShouldFailMixin
 from allmydata.interfaces import NotEnoughSharesError, NoSharesError
 from allmydata.immutable.downloader.common import BadSegmentNumberError, \
@@ -1267,14 +1267,22 @@ class Status(unittest.TestCase):
         e2.finished(now+3)
         self.failUnlessEqual(ds.get_active(), False)
 
+def make_server(clientid):
+    tubid = hashutil.tagged_hash("clientid", clientid)[:20]
+    return NoNetworkServer(tubid, None)
+def make_servers(clientids):
+    servers = {}
+    for clientid in clientids:
+        servers[clientid] = make_server(clientid)
+    return servers
+
 class MyShare:
-    def __init__(self, shnum, peerid, rtt):
+    def __init__(self, shnum, server, rtt):
         self._shnum = shnum
-        self._peerid = peerid
-        self._peerid_s = peerid
+        self._server = server
         self._dyhb_rtt = rtt
     def __repr__(self):
-        return "sh%d-on-%s" % (self._shnum, self._peerid)
+        return "sh%d-on-%s" % (self._shnum, self._server.name())
 
 class MySegmentFetcher(SegmentFetcher):
     def __init__(self, *args, **kwargs):
@@ -1319,7 +1327,8 @@ class Selection(unittest.TestCase):
     def test_only_one_share(self):
         node = FakeNode()
         sf = MySegmentFetcher(node, 0, 3, None)
-        shares = [MyShare(0, "peer-A", 0.0)]
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0)]
         sf.add_shares(shares)
         d = flushEventualQueue()
         def _check1(ign):
@@ -1331,7 +1340,8 @@ class Selection(unittest.TestCase):
         def _check2(ign):
             self.failUnless(node.failed)
             self.failUnless(node.failed.check(NotEnoughSharesError))
-            self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
+            sname = serverA.name()
+            self.failUnlessIn("complete= pending=sh0-on-%s overdue= unused="  % sname,
                               str(node.failed))
         d.addCallback(_check2)
         return d
@@ -1339,7 +1349,7 @@ class Selection(unittest.TestCase):
     def test_good_diversity_early(self):
         node = FakeNode()
         sf = MySegmentFetcher(node, 0, 3, None)
-        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
         sf.add_shares(shares)
         d = flushEventualQueue()
         def _check1(ign):
@@ -1361,7 +1371,7 @@ class Selection(unittest.TestCase):
     def test_good_diversity_late(self):
         node = FakeNode()
         sf = MySegmentFetcher(node, 0, 3, None)
-        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
         sf.add_shares([])
         d = flushEventualQueue()
         def _check1(ign):
@@ -1390,11 +1400,12 @@ class Selection(unittest.TestCase):
         # we could satisfy the read entirely from the first server, but we'd
         # prefer not to. Instead, we expect to only pull one share from the
         # first server
-        shares = [MyShare(0, "peer-A", 0.0),
-                  MyShare(1, "peer-A", 0.0),
-                  MyShare(2, "peer-A", 0.0),
-                  MyShare(3, "peer-B", 1.0),
-                  MyShare(4, "peer-C", 2.0),
+        servers = make_servers(["peer-A", "peer-B", "peer-C"])
+        shares = [MyShare(0, servers["peer-A"], 0.0),
+                  MyShare(1, servers["peer-A"], 0.0),
+                  MyShare(2, servers["peer-A"], 0.0),
+                  MyShare(3, servers["peer-B"], 1.0),
+                  MyShare(4, servers["peer-C"], 2.0),
                   ]
         sf.add_shares([])
         d = flushEventualQueue()
@@ -1424,11 +1435,12 @@ class Selection(unittest.TestCase):
         sf = MySegmentFetcher(node, 0, 3, None)
         # we satisfy the read entirely from the first server because we don't
         # have any other choice.
-        shares = [MyShare(0, "peer-A", 0.0),
-                  MyShare(1, "peer-A", 0.0),
-                  MyShare(2, "peer-A", 0.0),
-                  MyShare(3, "peer-A", 0.0),
-                  MyShare(4, "peer-A", 0.0),
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0),
+                  MyShare(1, serverA, 0.0),
+                  MyShare(2, serverA, 0.0),
+                  MyShare(3, serverA, 0.0),
+                  MyShare(4, serverA, 0.0),
                   ]
         sf.add_shares([])
         d = flushEventualQueue()
@@ -1459,11 +1471,12 @@ class Selection(unittest.TestCase):
         sf = MySegmentFetcher(node, 0, 3, None)
         # we satisfy the read entirely from the first server because we don't
         # have any other choice.
-        shares = [MyShare(0, "peer-A", 0.0),
-                  MyShare(1, "peer-A", 0.0),
-                  MyShare(2, "peer-A", 0.0),
-                  MyShare(3, "peer-A", 0.0),
-                  MyShare(4, "peer-A", 0.0),
+        serverA = make_server("peer-A")
+        shares = [MyShare(0, serverA, 0.0),
+                  MyShare(1, serverA, 0.0),
+                  MyShare(2, serverA, 0.0),
+                  MyShare(3, serverA, 0.0),
+                  MyShare(4, serverA, 0.0),
                   ]
         sf.add_shares(shares)
         d = flushEventualQueue()
@@ -1487,7 +1500,7 @@ class Selection(unittest.TestCase):
     def test_overdue(self):
         node = FakeNode()
         sf = MySegmentFetcher(node, 0, 3, None)
-        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
+        shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)]
         sf.add_shares(shares)
         d = flushEventualQueue()
         def _check1(ign):
@@ -1515,7 +1528,8 @@ class Selection(unittest.TestCase):
     def test_overdue_fails(self):
         node = FakeNode()
         sf = MySegmentFetcher(node, 0, 3, None)
-        shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
+        servers = make_servers(["peer-%d" % i for i in range(6)])
+        shares = [MyShare(i, servers["peer-%d" % i], i) for i in range(6)]
         sf.add_shares(shares)
         sf.no_more_shares()
         d = flushEventualQueue()
@@ -1548,7 +1562,8 @@ class Selection(unittest.TestCase):
         def _check4(ign):
             self.failUnless(node.failed)
             self.failUnless(node.failed.check(NotEnoughSharesError))
-            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
+            sname = servers["peer-2"].name()
+            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-%s unused=" % sname,
                               str(node.failed))
         d.addCallback(_check4)
         return d
@@ -1559,11 +1574,13 @@ class Selection(unittest.TestCase):
         # we could satisfy the read entirely from the first server, but we'd
         # prefer not to. Instead, we expect to only pull one share from the
         # first server
-        shares = [MyShare(0, "peer-A", 0.0),
-                  MyShare(1, "peer-B", 1.0),
-                  MyShare(0, "peer-C", 2.0), # this will be skipped
-                  MyShare(1, "peer-D", 3.0),
-                  MyShare(2, "peer-E", 4.0),
+        servers = make_servers(["peer-A", "peer-B", "peer-C", "peer-D",
+                                "peer-E"])
+        shares = [MyShare(0, servers["peer-A"],0.0),
+                  MyShare(1, servers["peer-B"],1.0),
+                  MyShare(0, servers["peer-C"],2.0), # this will be skipped
+                  MyShare(1, servers["peer-D"],3.0),
+                  MyShare(2, servers["peer-E"],4.0),
                   ]
         sf.add_shares(shares[:3])
         d = flushEventualQueue()