From: Brian Warner Date: Mon, 21 Feb 2011 01:58:04 +0000 (-0800) Subject: Refactor StorageFarmBroker handling of servers X-Git-Url: https://git.rkrishnan.org/simplejson/components/-?a=commitdiff_plain;h=ffd296fc5ab8007f7ab25f6da62de654fb57e765;p=tahoe-lafs%2Ftahoe-lafs.git Refactor StorageFarmBroker handling of servers Pass around IServer instance instead of (peerid, rref) tuple. Replace "descriptor" with "server". Other replacements: get_all_servers -> get_connected_servers/get_known_servers get_servers_for_index -> get_servers_for_psi (now returns IServers) This change still needs to be pushed further down: lots of code is now getting the IServer and then distributing (peerid, rref) internally. Instead, it ought to distribute the IServer internally and delay extracting a serverid or rref until the last moment. no_network.py was updated to retain parallelism. --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index fa515d41..fb7e0c89 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -468,7 +468,7 @@ class Client(node.Node, pollmixin.PollMixin): temporary test network and need to know when it is safe to proceed with an upload or download.""" def _check(): - return len(self.storage_broker.get_all_servers()) >= num_clients + return len(self.storage_broker.get_connected_servers()) >= num_clients d = self.poll(_check, 0.5) d.addCallback(lambda res: None) return d diff --git a/src/allmydata/control.py b/src/allmydata/control.py index cb0c84f7..045a34e5 100644 --- a/src/allmydata/control.py +++ b/src/allmydata/control.py @@ -91,7 +91,7 @@ class ControlServer(Referenceable, service.Service): # 300ms. results = {} sb = self.parent.get_storage_broker() - everyone = sb.get_all_servers() + everyone = sb.get_connected_servers() num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3))) everyone = list(everyone) * num_pings d = self._do_one_ping(None, everyone, results) @@ -99,7 +99,9 @@ class ControlServer(Referenceable, service.Service): def _do_one_ping(self, res, everyone_left, results): if not everyone_left: return results - peerid, connection = everyone_left.pop(0) + server = everyone_left.pop(0) + peerid = server.get_serverid() + connection = server.get_rref() start = time.time() d = connection.callRemote("get_buckets", "\x00"*16) def _done(ignored): diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py index cd5c5568..f3500291 100644 --- a/src/allmydata/immutable/checker.py +++ b/src/allmydata/immutable/checker.py @@ -463,9 +463,6 @@ class Checker(log.PrefixingLogMixin): def __init__(self, verifycap, servers, verify, add_lease, secret_holder, monitor): assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap)) - assert precondition(isinstance(servers, (set, frozenset)), servers) - for (serverid, serverrref) in servers: - assert precondition(isinstance(serverid, str)) prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60) log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix) @@ -489,7 +486,7 @@ class Checker(log.PrefixingLogMixin): def _get_cancel_secret(self, peerid): return bucket_cancel_secret_hash(self.file_cancel_secret, peerid) - def _get_buckets(self, server, storageindex, serverid): + def _get_buckets(self, s, storageindex): """Return a deferred that eventually fires with ({sharenum: bucket}, serverid, success). In case the server is disconnected or returns a Failure then it fires with ({}, serverid, False) (A server @@ -498,14 +495,16 @@ class Checker(log.PrefixingLogMixin): that we want to track and report whether or not each server responded.)""" + rref = s.get_rref() + serverid = s.get_serverid() if self._add_lease: renew_secret = self._get_renewal_secret(serverid) cancel_secret = self._get_cancel_secret(serverid) - d2 = server.callRemote("add_lease", storageindex, - renew_secret, cancel_secret) + d2 = rref.callRemote("add_lease", storageindex, + renew_secret, cancel_secret) d2.addErrback(self._add_lease_failed, serverid, storageindex) - d = server.callRemote("get_buckets", storageindex) + d = rref.callRemote("get_buckets", storageindex) def _wrap_results(res): return (res, serverid, True) @@ -656,7 +655,7 @@ class Checker(log.PrefixingLogMixin): return d - def _verify_server_shares(self, serverid, ss): + def _verify_server_shares(self, s): """ Return a deferred which eventually fires with a tuple of (set(sharenum), serverid, set(corruptsharenum), set(incompatiblesharenum), success) showing all the shares verified @@ -679,7 +678,7 @@ class Checker(log.PrefixingLogMixin): then disconnected and ceased responding, or returned a failure, it is still marked with the True flag for 'success'. """ - d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid) + d = self._get_buckets(s, self._verifycap.get_storage_index()) def _got_buckets(result): bucketdict, serverid, success = result @@ -710,12 +709,12 @@ class Checker(log.PrefixingLogMixin): def _err(f): f.trap(RemoteException, DeadReferenceError) - return (set(), serverid, set(), set(), False) + return (set(), s.get_serverid(), set(), set(), False) d.addCallbacks(_got_buckets, _err) return d - def _check_server_shares(self, serverid, ss): + def _check_server_shares(self, s): """Return a deferred which eventually fires with a tuple of (set(sharenum), serverid, set(), set(), responded) showing all the shares claimed to be served by this server. In case the server is @@ -726,7 +725,7 @@ class Checker(log.PrefixingLogMixin): def _curry_empty_corrupted(res): buckets, serverid, responded = res return (set(buckets), serverid, set(), set(), responded) - d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid) + d = self._get_buckets(s, self._verifycap.get_storage_index()) d.addCallback(_curry_empty_corrupted) return d @@ -794,10 +793,10 @@ class Checker(log.PrefixingLogMixin): def start(self): ds = [] if self._verify: - for (serverid, ss) in self._servers: - ds.append(self._verify_server_shares(serverid, ss)) + for s in self._servers: + ds.append(self._verify_server_shares(s)) else: - for (serverid, ss) in self._servers: - ds.append(self._check_server_shares(serverid, ss)) + for s in self._servers: + ds.append(self._check_server_shares(s)) return deferredutil.gatherResults(ds).addCallback(self._format_results) diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py index 4816ccdf..f1142e79 100644 --- a/src/allmydata/immutable/downloader/finder.py +++ b/src/allmydata/immutable/downloader/finder.py @@ -62,8 +62,9 @@ class ShareFinder: # test_dirnode, which creates us with storage_broker=None if not self._started: si = self.verifycap.storage_index - s = self._storage_broker.get_servers_for_index(si) - self._servers = iter(s) + servers = [(s.get_serverid(), s.get_rref()) + for s in self._storage_broker.get_servers_for_psi(si)] + self._servers = iter(servers) self._started = True def log(self, *args, **kwargs): diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index ed3785b7..8fc47725 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -102,7 +102,7 @@ class CiphertextFileNode: verifycap = self._verifycap storage_index = verifycap.storage_index sb = self._storage_broker - servers = sb.get_all_servers() + servers = sb.get_connected_servers() sh = self._secret_holder c = Checker(verifycap=verifycap, servers=servers, @@ -160,7 +160,7 @@ class CiphertextFileNode: def check(self, monitor, verify=False, add_lease=False): verifycap = self._verifycap sb = self._storage_broker - servers = sb.get_all_servers() + servers = sb.get_connected_servers() sh = self._secret_holder v = Checker(verifycap=verifycap, servers=servers, diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py index dea94c53..26be7860 100644 --- a/src/allmydata/immutable/offloaded.py +++ b/src/allmydata/immutable/offloaded.py @@ -53,10 +53,10 @@ class CHKCheckerAndUEBFetcher: def _get_all_shareholders(self, storage_index): dl = [] - for (peerid, ss) in self._peer_getter(storage_index): - d = ss.callRemote("get_buckets", storage_index) + for s in self._peer_getter(storage_index): + d = s.get_rref().callRemote("get_buckets", storage_index) d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(peerid,)) + callbackArgs=(s.get_serverid(),)) dl.append(d) return defer.DeferredList(dl) @@ -620,7 +620,7 @@ class Helper(Referenceable): lp2 = self.log("doing a quick check+UEBfetch", parent=lp, level=log.NOISY) sb = self._storage_broker - c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2) + c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2) d = c.check() def _checked(res): if res: diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 5dd257bd..a53f12a2 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -224,7 +224,8 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin): num_share_hashes, EXTENSION_SIZE, None) allocated_size = wbp.get_allocated_size() - all_peers = storage_broker.get_servers_for_index(storage_index) + all_peers = [(s.get_serverid(), s.get_rref()) + for s in storage_broker.get_servers_for_psi(storage_index)] if not all_peers: raise NoServersError("client gave us zero peers") diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 48094a94..85c3e079 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -352,13 +352,17 @@ class IStorageBucketReader(Interface): """ class IStorageBroker(Interface): - def get_servers_for_index(peer_selection_index): + def get_servers_for_psi(peer_selection_index): """ - @return: list of (peerid, versioned-rref) tuples + @return: list of IServer instances """ - def get_all_servers(): + def get_connected_servers(): """ - @return: frozenset of (peerid, versioned-rref) tuples + @return: frozenset of connected IServer instances + """ + def get_known_servers(): + """ + @return: frozenset of IServer instances """ def get_all_serverids(): """ diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index 2d63c87b..580682b6 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -179,7 +179,8 @@ class Publish: self._encprivkey = self._node.get_encprivkey() sb = self._storage_broker - full_peerlist = sb.get_servers_for_index(self._storage_index) + full_peerlist = [(s.get_serverid(), s.get_rref()) + for s in sb.get_servers_for_psi(self._storage_index)] self.full_peerlist = full_peerlist # for use later, immutable self.bad_peers = set() # peerids who have errbacked/refused requests diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index 999691fa..c69e4108 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -424,7 +424,8 @@ class ServermapUpdater: self._queries_completed = 0 sb = self._storage_broker - full_peerlist = sb.get_servers_for_index(self._storage_index) + full_peerlist = [(s.get_serverid(), s.get_rref()) + for s in sb.get_servers_for_psi(self._storage_index)] self.full_peerlist = full_peerlist # for use later, immutable self.extra_peers = full_peerlist[:] # peers are removed as we use them self._good_peers = set() # peers who had some shares diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 7945eac0..e1b3f553 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -34,7 +34,7 @@ from zope.interface import implements, Interface from foolscap.api import eventually from allmydata.interfaces import IStorageBroker from allmydata.util import idlib, log -from allmydata.util.assertutil import _assert, precondition +from allmydata.util.assertutil import precondition from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.hashutil import sha1 @@ -66,11 +66,11 @@ class StorageFarmBroker: self.tub = tub assert permute_peers # False not implemented yet self.permute_peers = permute_peers - # self.descriptors maps serverid -> IServerDescriptor, and keeps - # track of all the storage servers that we've heard about. Each - # descriptor manages its own Reconnector, and will give us a - # RemoteReference when we ask them for it. - self.descriptors = {} + # self.servers maps serverid -> IServer, and keeps track of all the + # storage servers that we've heard about. Each descriptor manages its + # own Reconnector, and will give us a RemoteReference when we ask + # them for it. + self.servers = {} # self.test_servers are statically configured from unit tests self.test_servers = {} # serverid -> rref self.introducer_client = None @@ -79,7 +79,7 @@ class StorageFarmBroker: def test_add_server(self, serverid, rref): self.test_servers[serverid] = rref def test_add_descriptor(self, serverid, dsc): - self.descriptors[serverid] = dsc + self.servers[serverid] = dsc def use_introducer(self, introducer_client): self.introducer_client = ic = introducer_client @@ -89,16 +89,16 @@ class StorageFarmBroker: precondition(isinstance(serverid, str), serverid) precondition(len(serverid) == 20, serverid) assert ann_d["service-name"] == "storage" - old = self.descriptors.get(serverid) + old = self.servers.get(serverid) if old: if old.get_announcement() == ann_d: return # duplicate # replacement - del self.descriptors[serverid] + del self.servers[serverid] old.stop_connecting() # now we forget about them and start using the new one - dsc = NativeStorageClientDescriptor(serverid, ann_d) - self.descriptors[serverid] = dsc + dsc = NativeStorageServer(serverid, ann_d) + self.servers[serverid] = dsc dsc.start_connecting(self.tub, self._trigger_connections) # the descriptor will manage their own Reconnector, and each time we # need servers, we'll ask them if they're connected or not. @@ -111,48 +111,44 @@ class StorageFarmBroker: # connections to only a subset of the servers, which would increase # the chances that we'll put shares in weird places (and not update # existing shares of mutable files). See #374 for more details. - for dsc in self.descriptors.values(): + for dsc in self.servers.values(): dsc.try_to_connect() - - - def get_servers_for_index(self, peer_selection_index): - # first cut: return a list of (peerid, versioned-rref) tuples + def get_servers_for_psi(self, peer_selection_index): + # return a list of server objects (IServers) assert self.permute_peers == True - servers = self.get_all_servers() - key = peer_selection_index - return sorted(servers, key=lambda x: sha1(key+x[0]).digest()) - - def get_all_servers(self): - # return a frozenset of (peerid, versioned-rref) tuples - servers = {} - for serverid,rref in self.test_servers.items(): - servers[serverid] = rref - for serverid,dsc in self.descriptors.items(): - rref = dsc.get_rref() - if rref: - servers[serverid] = rref - result = frozenset(servers.items()) - _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids()) - return result + def _permuted(server): + seed = server.get_permutation_seed() + return sha1(peer_selection_index + seed).digest() + return sorted(self.get_connected_servers(), key=_permuted) def get_all_serverids(self): serverids = set() serverids.update(self.test_servers.keys()) - serverids.update(self.descriptors.keys()) + serverids.update(self.servers.keys()) return frozenset(serverids) - def get_all_descriptors(self): - return sorted(self.descriptors.values(), - key=lambda dsc: dsc.get_serverid()) + def get_connected_servers(self): + return frozenset([s for s in self.get_known_servers() + if s.get_rref()]) + + def get_known_servers(self): + servers = [] + for serverid,rref in self.test_servers.items(): + s = NativeStorageServer(serverid, {}) + s.rref = rref + servers.append(s) + servers.extend(self.servers.values()) + return sorted(servers, key=lambda s: s.get_serverid()) def get_nickname_for_serverid(self, serverid): - if serverid in self.descriptors: - return self.descriptors[serverid].get_nickname() + if serverid in self.servers: + return self.servers[serverid].get_nickname() return None -class IServerDescriptor(Interface): +class IServer(Interface): + """I live in the client, and represent a single server.""" def start_connecting(tub, trigger_cb): pass def get_nickname(): @@ -160,7 +156,7 @@ class IServerDescriptor(Interface): def get_rref(): pass -class NativeStorageClientDescriptor: +class NativeStorageServer: """I hold information about a storage server that we want to connect to. If we are connected, I hold the RemoteReference, their host address, and the their version information. I remember information about when we were @@ -176,7 +172,7 @@ class NativeStorageClientDescriptor: @ivar rref: the RemoteReference, if connected, otherwise None @ivar remote_host: the IAddress, if connected, otherwise None """ - implements(IServerDescriptor) + implements(IServer) VERSION_DEFAULTS = { "http://allmydata.org/tahoe/protocols/storage/v1" : @@ -203,6 +199,8 @@ class NativeStorageClientDescriptor: def get_serverid(self): return self.serverid + def get_permutation_seed(self): + return self.serverid def get_nickname(self): return self.announcement["nickname"].decode("utf-8") diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index e8117bf7..dfcaa100 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -572,7 +572,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): if not c.connected_to_introducer(): return False sb = c.get_storage_broker() - if len(sb.get_all_servers()) != self.numclients: + if len(sb.get_connected_servers()) != self.numclients: return False return True diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index 554a75f6..f07c36b4 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -117,13 +117,26 @@ def wrap_storage_server(original): wrapper.version = original.remote_get_version() return wrapper +class NoNetworkServer: + def __init__(self, serverid, rref): + self.serverid = serverid + self.rref = rref + def get_serverid(self): + return self.serverid + def get_permutation_seed(self): + return self.serverid + def get_rref(self): + return self.rref + class NoNetworkStorageBroker: implements(IStorageBroker) - def get_servers_for_index(self, key): - return sorted(self.client._servers, - key=lambda x: sha1(key+x[0]).digest()) - def get_all_servers(self): - return frozenset(self.client._servers) + def get_servers_for_psi(self, peer_selection_index): + def _permuted(server): + seed = server.get_permutation_seed() + return sha1(peer_selection_index + seed).digest() + return sorted(self.get_connected_servers(), key=_permuted) + def get_connected_servers(self): + return self.client._servers def get_nickname_for_serverid(self, serverid): return None @@ -181,8 +194,10 @@ class NoNetworkGrid(service.MultiService): self.basedir = basedir fileutil.make_dirs(basedir) - self.servers_by_number = {} - self.servers_by_id = {} + self.servers_by_number = {} # maps to StorageServer instance + self.wrappers_by_id = {} # maps to wrapped StorageServer instance + self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped + # StorageServer self.clients = [] for i in range(num_servers): @@ -234,11 +249,16 @@ class NoNetworkGrid(service.MultiService): ss.setServiceParent(middleman) serverid = ss.my_nodeid self.servers_by_number[i] = ss - self.servers_by_id[serverid] = wrap_storage_server(ss) + wrapper = wrap_storage_server(ss) + self.wrappers_by_id[serverid] = wrapper + self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper) self.rebuild_serverlist() + def get_all_serverids(self): + return self.proxies_by_id.keys() + def rebuild_serverlist(self): - self.all_servers = frozenset(self.servers_by_id.items()) + self.all_servers = frozenset(self.proxies_by_id.values()) for c in self.clients: c._servers = self.all_servers @@ -249,23 +269,24 @@ class NoNetworkGrid(service.MultiService): if ss.my_nodeid == serverid: del self.servers_by_number[i] break - del self.servers_by_id[serverid] + del self.wrappers_by_id[serverid] + del self.proxies_by_id[serverid] self.rebuild_serverlist() def break_server(self, serverid): # mark the given server as broken, so it will throw exceptions when # asked to hold a share or serve a share - self.servers_by_id[serverid].broken = True + self.wrappers_by_id[serverid].broken = True def hang_server(self, serverid): # hang the given server - ss = self.servers_by_id[serverid] + ss = self.wrappers_by_id[serverid] assert ss.hung_until is None ss.hung_until = defer.Deferred() def unhang_server(self, serverid): # unhang the given server - ss = self.servers_by_id[serverid] + ss = self.wrappers_by_id[serverid] assert ss.hung_until is not None ss.hung_until.callback(None) ss.hung_until = None diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index 7548d7cc..d1cf86f7 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -3,7 +3,7 @@ import simplejson from twisted.trial import unittest from allmydata import check_results, uri from allmydata.web import check_results as web_check_results -from allmydata.storage_client import StorageFarmBroker, NativeStorageClientDescriptor +from allmydata.storage_client import StorageFarmBroker, NativeStorageServer from allmydata.monitor import Monitor from allmydata.test.no_network import GridTestMixin from allmydata.immutable.upload import Data @@ -28,7 +28,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin): "my-version": "ver", "oldest-supported": "oldest", } - dsc = NativeStorageClientDescriptor(peerid, ann_d) + dsc = NativeStorageServer(peerid, ann_d) sb.test_add_descriptor(peerid, dsc) c = FakeClient() c.storage_broker = sb diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index 7c6a4567..abf63aa7 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -134,13 +134,12 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0) def _permute(self, sb, key): - return [ peerid - for (peerid,rref) in sb.get_servers_for_index(key) ] + return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ] def test_permute(self): sb = StorageFarmBroker(None, True) for k in ["%d" % i for i in range(5)]: - sb.test_add_server(k, None) + sb.test_add_server(k, "rref") self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2']) self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3']) diff --git a/src/allmydata/test/test_deepcheck.py b/src/allmydata/test/test_deepcheck.py index 5168b37e..4bcfe289 100644 --- a/src/allmydata/test/test_deepcheck.py +++ b/src/allmydata/test/test_deepcheck.py @@ -287,14 +287,14 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): self.failUnlessEqual(d["list-corrupt-shares"], [], where) if not incomplete: self.failUnlessEqual(sorted(d["servers-responding"]), - sorted(self.g.servers_by_id.keys()), + sorted(self.g.get_all_serverids()), where) self.failUnless("sharemap" in d, str((where, d))) all_serverids = set() for (shareid, serverids) in d["sharemap"].items(): all_serverids.update(serverids) self.failUnlessEqual(sorted(all_serverids), - sorted(self.g.servers_by_id.keys()), + sorted(self.g.get_all_serverids()), where) self.failUnlessEqual(d["count-wrong-shares"], 0, where) @@ -545,7 +545,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): if not incomplete: self.failUnlessEqual(sorted(r["servers-responding"]), sorted([idlib.nodeid_b2a(sid) - for sid in self.g.servers_by_id]), + for sid in self.g.get_all_serverids()]), where) self.failUnless("sharemap" in r, where) all_serverids = set() @@ -553,7 +553,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase): all_serverids.update(serverids_s) self.failUnlessEqual(sorted(all_serverids), sorted([idlib.nodeid_b2a(sid) - for sid in self.g.servers_by_id]), + for sid in self.g.get_all_serverids()]), where) self.failUnlessEqual(r["count-wrong-shares"], 0, where) self.failUnlessEqual(r["count-recoverable-versions"], 1, where) diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index 3a692209..ab2e98bc 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -596,7 +596,8 @@ class DownloadTest(_Base, unittest.TestCase): # tweak the client's copies of server-version data, so it believes # that they're old and can't handle reads that overrun the length of # the share. This exercises a different code path. - for (peerid, rref) in self.c0.storage_broker.get_all_servers(): + for s in self.c0.storage_broker.get_connected_servers(): + rref = s.get_rref() v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] v1["tolerates-immutable-read-overrun"] = False @@ -1167,7 +1168,8 @@ class DownloadV2(_Base, unittest.TestCase): # tweak the client's copies of server-version data, so it believes # that they're old and can't handle reads that overrun the length of # the share. This exercises a different code path. - for (peerid, rref) in self.c0.storage_broker.get_all_servers(): + for s in self.c0.storage_broker.get_connected_servers(): + rref = s.get_rref() v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] v1["tolerates-immutable-read-overrun"] = False @@ -1186,7 +1188,8 @@ class DownloadV2(_Base, unittest.TestCase): self.set_up_grid() self.c0 = self.g.clients[0] - for (peerid, rref) in self.c0.storage_broker.get_all_servers(): + for s in self.c0.storage_broker.get_connected_servers(): + rref = s.get_rref() v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] v1["tolerates-immutable-read-overrun"] = False diff --git a/src/allmydata/test/test_hung_server.py b/src/allmydata/test/test_hung_server.py index e63cba32..abed967a 100644 --- a/src/allmydata/test/test_hung_server.py +++ b/src/allmydata/test/test_hung_server.py @@ -101,7 +101,8 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, self.c0 = self.g.clients[0] nm = self.c0.nodemaker - self.servers = sorted([(id, ss) for (id, ss) in nm.storage_broker.get_all_servers()]) + self.servers = sorted([(s.get_serverid(), s.get_rref()) + for s in nm.storage_broker.get_connected_servers()]) self.servers = self.servers[5:] + self.servers[:5] if mutable: diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py index edac3e69..1ca43838 100644 --- a/src/allmydata/test/test_immutable.py +++ b/src/allmydata/test/test_immutable.py @@ -95,12 +95,23 @@ class TestShareFinder(unittest.TestCase): self.s.hungry() eventually(_give_buckets_and_hunger_again) return d + class MockIServer(object): + def __init__(self, serverid, rref): + self.serverid = serverid + self.rref = rref + def get_serverid(self): + return self.serverid + def get_rref(self): + return self.rref mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()}) mockserver2 = MockServer({}) mockserver3 = MockServer({3: mock.Mock()}) mockstoragebroker = mock.Mock() - mockstoragebroker.get_servers_for_index.return_value = [ ('ms1', mockserver1), ('ms2', mockserver2), ('ms3', mockserver3), ] + servers = [ MockIServer("ms1", mockserver1), + MockIServer("ms2", mockserver2), + MockIServer("ms3", mockserver3), ] + mockstoragebroker.get_servers_for_psi.return_value = servers mockdownloadstatus = mock.Mock() mocknode = MockNode(check_reneging=True, check_fetch_failed=True) diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index e4e6eb7f..0f49dced 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -1910,11 +1910,9 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): d.addCallback(_got_key) def _break_peer0(res): si = self._storage_index - peerlist = nm.storage_broker.get_servers_for_index(si) - peerid0, connection0 = peerlist[0] - peerid1, connection1 = peerlist[1] - connection0.broken = True - self.connection1 = connection1 + servers = nm.storage_broker.get_servers_for_psi(si) + self.g.break_server(servers[0].get_serverid()) + self.server1 = servers[1] d.addCallback(_break_peer0) # now "create" the file, using the pre-established key, and let the # initial publish finally happen @@ -1925,7 +1923,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) # now break the second peer def _break_peer1(res): - self.connection1.broken = True + self.g.break_server(self.server1.get_serverid()) d.addCallback(_break_peer1) d.addCallback(lambda res: n.overwrite("contents 2")) # that ought to work too @@ -1956,7 +1954,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): nm = self.g.clients[0].nodemaker sb = nm.storage_broker - peerids = [serverid for (serverid,ss) in sb.get_all_servers()] + peerids = [s.get_serverid() for s in sb.get_connected_servers()] self.g.break_server(peerids[0]) d = nm.create_mutable_file("contents 1") @@ -1980,8 +1978,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): self.basedir = "mutable/Problems/test_publish_all_servers_bad" self.set_up_grid() nm = self.g.clients[0].nodemaker - for (serverid,ss) in nm.storage_broker.get_all_servers(): - ss.broken = True + for s in nm.storage_broker.get_connected_servers(): + s.get_rref().broken = True d = self.shouldFail(NotEnoughServersError, "test_publish_all_servers_bad", @@ -2033,8 +2031,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): # 1. notice which server gets a read() call first # 2. tell that server to start throwing errors killer = FirstServerGetsKilled() - for (serverid,ss) in nm.storage_broker.get_all_servers(): - ss.post_call_notifier = killer.notify + for s in nm.storage_broker.get_connected_servers(): + s.get_rref().post_call_notifier = killer.notify d.addCallback(_created) # now we update a servermap from a new node (which doesn't have the @@ -2059,8 +2057,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin): self.uri = n.get_uri() self.n2 = nm.create_from_cap(self.uri) deleter = FirstServerGetsDeleted() - for (serverid,ss) in nm.storage_broker.get_all_servers(): - ss.post_call_notifier = deleter.notify + for s in nm.storage_broker.get_connected_servers(): + s.get_rref().post_call_notifier = deleter.notify d.addCallback(_created) d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE)) return d diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index bf6af094..e37f6638 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -65,7 +65,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): all_peerids = c.get_storage_broker().get_all_serverids() self.failUnlessEqual(len(all_peerids), self.numclients+1) sb = c.storage_broker - permuted_peers = sb.get_servers_for_index("a") + permuted_peers = sb.get_servers_for_psi("a") self.failUnlessEqual(len(permuted_peers), self.numclients+1) d.addCallback(_check) @@ -101,7 +101,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): all_peerids = c.get_storage_broker().get_all_serverids() self.failUnlessEqual(len(all_peerids), self.numclients) sb = c.storage_broker - permuted_peers = sb.get_servers_for_index("a") + permuted_peers = sb.get_servers_for_psi("a") self.failUnlessEqual(len(permuted_peers), self.numclients) d.addCallback(_check_connections) diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 022185e3..11259324 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -1730,12 +1730,8 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d.addCallback(lambda ign: self._add_server(server_number=2)) def _break_server_2(ign): - server = self.g.servers_by_number[2].my_nodeid - # We have to break the server in servers_by_id, - # because the one in servers_by_number isn't wrapped, - # and doesn't look at its broken attribute when answering - # queries. - self.g.servers_by_id[server].broken = True + serverid = self.g.servers_by_number[2].my_nodeid + self.g.break_server(serverid) d.addCallback(_break_server_2) d.addCallback(lambda ign: self._add_server(server_number=3, readonly=True)) diff --git a/src/allmydata/web/check_results.py b/src/allmydata/web/check_results.py index 96ce17bf..44d05062 100644 --- a/src/allmydata/web/check_results.py +++ b/src/allmydata/web/check_results.py @@ -139,9 +139,9 @@ class ResultsBase: # this table is sorted by permuted order sb = c.get_storage_broker() - permuted_peer_ids = [peerid - for (peerid, rref) - in sb.get_servers_for_index(cr.get_storage_index())] + permuted_peer_ids = [s.get_serverid() + for s + in sb.get_servers_for_psi(cr.get_storage_index())] num_shares_left = sum([len(shares) for shares in servers.values()]) servermap = [] diff --git a/src/allmydata/web/root.py b/src/allmydata/web/root.py index 3af15d92..2f1f6d4f 100644 --- a/src/allmydata/web/root.py +++ b/src/allmydata/web/root.py @@ -247,18 +247,18 @@ class Root(rend.Page): def data_connected_storage_servers(self, ctx, data): sb = self.client.get_storage_broker() - return len(sb.get_all_servers()) + return len(sb.get_connected_servers()) def data_services(self, ctx, data): sb = self.client.get_storage_broker() - return sb.get_all_descriptors() + return sb.get_known_servers() - def render_service_row(self, ctx, descriptor): - nodeid = descriptor.get_serverid() + def render_service_row(self, ctx, server): + nodeid = server.get_serverid() ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid)) - ctx.fillSlots("nickname", descriptor.get_nickname()) - rhost = descriptor.get_remote_host() + ctx.fillSlots("nickname", server.get_nickname()) + rhost = server.get_remote_host() if rhost: if nodeid == self.client.nodeid: rhost_s = "(loopback)" @@ -267,12 +267,12 @@ class Root(rend.Page): else: rhost_s = str(rhost) connected = "Yes: to " + rhost_s - since = descriptor.get_last_connect_time() + since = server.get_last_connect_time() else: connected = "No" - since = descriptor.get_last_loss_time() - announced = descriptor.get_announcement_time() - announcement = descriptor.get_announcement() + since = server.get_last_loss_time() + announced = server.get_announcement_time() + announcement = server.get_announcement() version = announcement["my-version"] service_name = announcement["service-name"]