From: Brian Warner Date: Sun, 27 Feb 2011 02:11:50 +0000 (-0700) Subject: immutable/downloader/share.py: reduce get_serverid(), one left, update ext deps X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=9ae026d9f47671831b0237dbb1a6c9cf1a2c5cf7;p=tahoe-lafs%2Ftahoe-lafs.git immutable/downloader/share.py: reduce get_serverid(), one left, update ext deps test_download.py: create+check MyShare instances better, make sure they share Server objects, now that finder.py cares --- diff --git a/src/allmydata/immutable/downloader/fetcher.py b/src/allmydata/immutable/downloader/fetcher.py index e78d37e7..84552279 100644 --- a/src/allmydata/immutable/downloader/fetcher.py +++ b/src/allmydata/immutable/downloader/fetcher.py @@ -189,7 +189,7 @@ class SegmentFetcher: sent_something = False want_more_diversity = False for sh in self._shares: # find one good share to fetch - shnum = sh._shnum ; serverid = sh._peerid + shnum = sh._shnum ; serverid = sh._server.get_serverid() if shnum in self._blocks: continue # don't request data we already have if shnum in self._active_share_map: @@ -229,9 +229,8 @@ class SegmentFetcher: # called by Shares, in response to our s.send_request() calls. if not self._running: return - log.msg("SegmentFetcher(%s)._block_request_activity:" - " Share(sh%d-on-%s) -> %s" % - (self._node._si_prefix, shnum, share._peerid_s, state), + log.msg("SegmentFetcher(%s)._block_request_activity: %s -> %s" % + (self._node._si_prefix, repr(share), state), level=log.NOISY, parent=self._lp, umid="vilNWA") # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share # from all our tracking lists. diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py index c09cc3c8..c2f9c1a6 100644 --- a/src/allmydata/immutable/downloader/finder.py +++ b/src/allmydata/immutable/downloader/finder.py @@ -204,8 +204,8 @@ class ShareFinder: # 2: break _get_satisfaction into Deferred-attached pieces. # Yuck. self._commonshares[shnum] = cs - s = Share(bucket, server.get_version(), self.verifycap, cs, self.node, - self._download_status, server.get_serverid(), shnum, dyhb_rtt, + s = Share(bucket, server, self.verifycap, cs, self.node, + self._download_status, shnum, dyhb_rtt, self._node_logparent) return s diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py index 78cce8ed..32be47a4 100644 --- a/src/allmydata/immutable/downloader/share.py +++ b/src/allmydata/immutable/downloader/share.py @@ -32,10 +32,10 @@ class Share: # this is a specific implementation of IShare for tahoe's native storage # servers. A different backend would use a different class. - def __init__(self, rref, server_version, verifycap, commonshare, node, - download_status, peerid, shnum, dyhb_rtt, logparent): + def __init__(self, rref, server, verifycap, commonshare, node, + download_status, shnum, dyhb_rtt, logparent): self._rref = rref - self._server_version = server_version + self._server = server self._node = node # holds share_hash_tree and UEB self.actual_segment_size = node.segment_size # might still be None # XXX change node.guessed_segment_size to @@ -46,8 +46,6 @@ class Share: self._UEB_length = None self._commonshare = commonshare # holds block_hash_tree self._download_status = download_status - self._peerid = peerid - self._peerid_s = base32.b2a(peerid)[:5] self._storage_index = verifycap.storage_index self._si_prefix = base32.b2a(verifycap.storage_index)[:8] self._shnum = shnum @@ -82,7 +80,8 @@ class Share: # download can re-fetch it. self._requested_blocks = [] # (segnum, set(observer2..)) - ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"] + v = server.get_version() + ver = v["http://allmydata.org/tahoe/protocols/storage/v1"] self._overrun_ok = ver["tolerates-immutable-read-overrun"] # If _overrun_ok and we guess the offsets correctly, we can get # everything in one RTT. If _overrun_ok and we guess wrong, we might @@ -94,7 +93,7 @@ class Share: self.had_corruption = False # for unit tests def __repr__(self): - return "Share(sh%d-on-%s)" % (self._shnum, self._peerid_s) + return "Share(sh%d-on-%s)" % (self._shnum, self._server.name()) def is_alive(self): # XXX: reconsider. If the share sees a single error, should it remain @@ -727,7 +726,8 @@ class Share: share=repr(self), start=start, length=length, level=log.NOISY, parent=self._lp, umid="sgVAyA") - req_ev = ds.add_request_sent(self._peerid, self._shnum, + req_ev = ds.add_request_sent(self._server.get_serverid(), + self._shnum, start, length, now()) d = self._send_request(start, length) d.addCallback(self._got_data, start, length, req_ev, lp) @@ -789,7 +789,7 @@ class Share: log.msg(format="error requesting %(start)d+%(length)d" " from %(server)s for si %(si)s", start=start, length=length, - server=self._peerid_s, si=self._si_prefix, + server=self._server.name(), si=self._si_prefix, failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw") # retire our observers, assuming we won't be able to make any # further progress diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py index 46527f72..254d702c 100644 --- a/src/allmydata/test/test_cli.py +++ b/src/allmydata/test/test_cli.py @@ -2412,7 +2412,7 @@ class Errors(GridTestMixin, CLITestMixin, unittest.TestCase): # enough shares. The one remaining share might be in either the # COMPLETE or the PENDING state. in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3" - in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3" + in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7vqgd) overdue= unused= need 3" d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) def _check1((rc, out, err)): diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index ab2e98bc..91ff8ada 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -8,10 +8,10 @@ from twisted.trial import unittest from twisted.internet import defer, reactor from allmydata import uri from allmydata.storage.server import storage_index_to_dir -from allmydata.util import base32, fileutil, spans, log +from allmydata.util import base32, fileutil, spans, log, hashutil from allmydata.util.consumer import download_to_data, MemoryConsumer from allmydata.immutable import upload, layout -from allmydata.test.no_network import GridTestMixin +from allmydata.test.no_network import GridTestMixin, NoNetworkServer from allmydata.test.common import ShouldFailMixin from allmydata.interfaces import NotEnoughSharesError, NoSharesError from allmydata.immutable.downloader.common import BadSegmentNumberError, \ @@ -1267,14 +1267,22 @@ class Status(unittest.TestCase): e2.finished(now+3) self.failUnlessEqual(ds.get_active(), False) +def make_server(clientid): + tubid = hashutil.tagged_hash("clientid", clientid)[:20] + return NoNetworkServer(tubid, None) +def make_servers(clientids): + servers = {} + for clientid in clientids: + servers[clientid] = make_server(clientid) + return servers + class MyShare: - def __init__(self, shnum, peerid, rtt): + def __init__(self, shnum, server, rtt): self._shnum = shnum - self._peerid = peerid - self._peerid_s = peerid + self._server = server self._dyhb_rtt = rtt def __repr__(self): - return "sh%d-on-%s" % (self._shnum, self._peerid) + return "sh%d-on-%s" % (self._shnum, self._server.name()) class MySegmentFetcher(SegmentFetcher): def __init__(self, *args, **kwargs): @@ -1319,7 +1327,8 @@ class Selection(unittest.TestCase): def test_only_one_share(self): node = FakeNode() sf = MySegmentFetcher(node, 0, 3, None) - shares = [MyShare(0, "peer-A", 0.0)] + serverA = make_server("peer-A") + shares = [MyShare(0, serverA, 0.0)] sf.add_shares(shares) d = flushEventualQueue() def _check1(ign): @@ -1331,7 +1340,8 @@ class Selection(unittest.TestCase): def _check2(ign): self.failUnless(node.failed) self.failUnless(node.failed.check(NotEnoughSharesError)) - self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=", + sname = serverA.name() + self.failUnlessIn("complete= pending=sh0-on-%s overdue= unused=" % sname, str(node.failed)) d.addCallback(_check2) return d @@ -1339,7 +1349,7 @@ class Selection(unittest.TestCase): def test_good_diversity_early(self): node = FakeNode() sf = MySegmentFetcher(node, 0, 3, None) - shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)] sf.add_shares(shares) d = flushEventualQueue() def _check1(ign): @@ -1361,7 +1371,7 @@ class Selection(unittest.TestCase): def test_good_diversity_late(self): node = FakeNode() sf = MySegmentFetcher(node, 0, 3, None) - shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)] sf.add_shares([]) d = flushEventualQueue() def _check1(ign): @@ -1390,11 +1400,12 @@ class Selection(unittest.TestCase): # we could satisfy the read entirely from the first server, but we'd # prefer not to. Instead, we expect to only pull one share from the # first server - shares = [MyShare(0, "peer-A", 0.0), - MyShare(1, "peer-A", 0.0), - MyShare(2, "peer-A", 0.0), - MyShare(3, "peer-B", 1.0), - MyShare(4, "peer-C", 2.0), + servers = make_servers(["peer-A", "peer-B", "peer-C"]) + shares = [MyShare(0, servers["peer-A"], 0.0), + MyShare(1, servers["peer-A"], 0.0), + MyShare(2, servers["peer-A"], 0.0), + MyShare(3, servers["peer-B"], 1.0), + MyShare(4, servers["peer-C"], 2.0), ] sf.add_shares([]) d = flushEventualQueue() @@ -1424,11 +1435,12 @@ class Selection(unittest.TestCase): sf = MySegmentFetcher(node, 0, 3, None) # we satisfy the read entirely from the first server because we don't # have any other choice. - shares = [MyShare(0, "peer-A", 0.0), - MyShare(1, "peer-A", 0.0), - MyShare(2, "peer-A", 0.0), - MyShare(3, "peer-A", 0.0), - MyShare(4, "peer-A", 0.0), + serverA = make_server("peer-A") + shares = [MyShare(0, serverA, 0.0), + MyShare(1, serverA, 0.0), + MyShare(2, serverA, 0.0), + MyShare(3, serverA, 0.0), + MyShare(4, serverA, 0.0), ] sf.add_shares([]) d = flushEventualQueue() @@ -1459,11 +1471,12 @@ class Selection(unittest.TestCase): sf = MySegmentFetcher(node, 0, 3, None) # we satisfy the read entirely from the first server because we don't # have any other choice. - shares = [MyShare(0, "peer-A", 0.0), - MyShare(1, "peer-A", 0.0), - MyShare(2, "peer-A", 0.0), - MyShare(3, "peer-A", 0.0), - MyShare(4, "peer-A", 0.0), + serverA = make_server("peer-A") + shares = [MyShare(0, serverA, 0.0), + MyShare(1, serverA, 0.0), + MyShare(2, serverA, 0.0), + MyShare(3, serverA, 0.0), + MyShare(4, serverA, 0.0), ] sf.add_shares(shares) d = flushEventualQueue() @@ -1487,7 +1500,7 @@ class Selection(unittest.TestCase): def test_overdue(self): node = FakeNode() sf = MySegmentFetcher(node, 0, 3, None) - shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)] sf.add_shares(shares) d = flushEventualQueue() def _check1(ign): @@ -1515,7 +1528,8 @@ class Selection(unittest.TestCase): def test_overdue_fails(self): node = FakeNode() sf = MySegmentFetcher(node, 0, 3, None) - shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)] + servers = make_servers(["peer-%d" % i for i in range(6)]) + shares = [MyShare(i, servers["peer-%d" % i], i) for i in range(6)] sf.add_shares(shares) sf.no_more_shares() d = flushEventualQueue() @@ -1548,7 +1562,8 @@ class Selection(unittest.TestCase): def _check4(ign): self.failUnless(node.failed) self.failUnless(node.failed.check(NotEnoughSharesError)) - self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=", + sname = servers["peer-2"].name() + self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-%s unused=" % sname, str(node.failed)) d.addCallback(_check4) return d @@ -1559,11 +1574,13 @@ class Selection(unittest.TestCase): # we could satisfy the read entirely from the first server, but we'd # prefer not to. Instead, we expect to only pull one share from the # first server - shares = [MyShare(0, "peer-A", 0.0), - MyShare(1, "peer-B", 1.0), - MyShare(0, "peer-C", 2.0), # this will be skipped - MyShare(1, "peer-D", 3.0), - MyShare(2, "peer-E", 4.0), + servers = make_servers(["peer-A", "peer-B", "peer-C", "peer-D", + "peer-E"]) + shares = [MyShare(0, servers["peer-A"],0.0), + MyShare(1, servers["peer-B"],1.0), + MyShare(0, servers["peer-C"],2.0), # this will be skipped + MyShare(1, servers["peer-D"],3.0), + MyShare(2, servers["peer-E"],4.0), ] sf.add_shares(shares[:3]) d = flushEventualQueue()