Refactor StorageFarmBroker handling of servers
authorBrian Warner <warner@lothar.com>
Mon, 21 Feb 2011 01:58:04 +0000 (17:58 -0800)
committerBrian Warner <warner@lothar.com>
Mon, 21 Feb 2011 01:58:04 +0000 (17:58 -0800)
Pass around IServer instance instead of (peerid, rref) tuple. Replace
"descriptor" with "server". Other replacements:

 get_all_servers -> get_connected_servers/get_known_servers
 get_servers_for_index -> get_servers_for_psi (now returns IServers)

This change still needs to be pushed further down: lots of code is now
getting the IServer and then distributing (peerid, rref) internally.
Instead, it ought to distribute the IServer internally and delay
extracting a serverid or rref until the last moment.

no_network.py was updated to retain parallelism.

24 files changed:
src/allmydata/client.py
src/allmydata/control.py
src/allmydata/immutable/checker.py
src/allmydata/immutable/downloader/finder.py
src/allmydata/immutable/filenode.py
src/allmydata/immutable/offloaded.py
src/allmydata/immutable/upload.py
src/allmydata/interfaces.py
src/allmydata/mutable/publish.py
src/allmydata/mutable/servermap.py
src/allmydata/storage_client.py
src/allmydata/test/common.py
src/allmydata/test/no_network.py
src/allmydata/test/test_checker.py
src/allmydata/test/test_client.py
src/allmydata/test/test_deepcheck.py
src/allmydata/test/test_download.py
src/allmydata/test/test_hung_server.py
src/allmydata/test/test_immutable.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_system.py
src/allmydata/test/test_upload.py
src/allmydata/web/check_results.py
src/allmydata/web/root.py

index fa515d41a0895efb3eb67256320fe24da9025318..fb7e0c890b4a51983d6707b6b6df225c709b363b 100644 (file)
@@ -468,7 +468,7 @@ class Client(node.Node, pollmixin.PollMixin):
         temporary test network and need to know when it is safe to proceed
         with an upload or download."""
         def _check():
-            return len(self.storage_broker.get_all_servers()) >= num_clients
+            return len(self.storage_broker.get_connected_servers()) >= num_clients
         d = self.poll(_check, 0.5)
         d.addCallback(lambda res: None)
         return d
index cb0c84f7238f3a40b60a480fbadb816283fe6ac9..045a34e5e0354e725e0ad727e043609013189f76 100644 (file)
@@ -91,7 +91,7 @@ class ControlServer(Referenceable, service.Service):
         # 300ms.
         results = {}
         sb = self.parent.get_storage_broker()
-        everyone = sb.get_all_servers()
+        everyone = sb.get_connected_servers()
         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
         everyone = list(everyone) * num_pings
         d = self._do_one_ping(None, everyone, results)
@@ -99,7 +99,9 @@ class ControlServer(Referenceable, service.Service):
     def _do_one_ping(self, res, everyone_left, results):
         if not everyone_left:
             return results
-        peerid, connection = everyone_left.pop(0)
+        server = everyone_left.pop(0)
+        peerid = server.get_serverid()
+        connection = server.get_rref()
         start = time.time()
         d = connection.callRemote("get_buckets", "\x00"*16)
         def _done(ignored):
index cd5c5568dae4cdb671042472cb598b23a20e29fb..f3500291bff00b7b62a86b6941a53850ca19374d 100644 (file)
@@ -463,9 +463,6 @@ class Checker(log.PrefixingLogMixin):
     def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
                  monitor):
         assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
-        assert precondition(isinstance(servers, (set, frozenset)), servers)
-        for (serverid, serverrref) in servers:
-            assert precondition(isinstance(serverid, str))
 
         prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
@@ -489,7 +486,7 @@ class Checker(log.PrefixingLogMixin):
     def _get_cancel_secret(self, peerid):
         return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
 
-    def _get_buckets(self, server, storageindex, serverid):
+    def _get_buckets(self, s, storageindex):
         """Return a deferred that eventually fires with ({sharenum: bucket},
         serverid, success). In case the server is disconnected or returns a
         Failure then it fires with ({}, serverid, False) (A server
@@ -498,14 +495,16 @@ class Checker(log.PrefixingLogMixin):
         that we want to track and report whether or not each server
         responded.)"""
 
+        rref = s.get_rref()
+        serverid = s.get_serverid()
         if self._add_lease:
             renew_secret = self._get_renewal_secret(serverid)
             cancel_secret = self._get_cancel_secret(serverid)
-            d2 = server.callRemote("add_lease", storageindex,
-                                   renew_secret, cancel_secret)
+            d2 = rref.callRemote("add_lease", storageindex,
+                                 renew_secret, cancel_secret)
             d2.addErrback(self._add_lease_failed, serverid, storageindex)
 
-        d = server.callRemote("get_buckets", storageindex)
+        d = rref.callRemote("get_buckets", storageindex)
         def _wrap_results(res):
             return (res, serverid, True)
 
@@ -656,7 +655,7 @@ class Checker(log.PrefixingLogMixin):
 
         return d
 
-    def _verify_server_shares(self, serverid, ss):
+    def _verify_server_shares(self, s):
         """ Return a deferred which eventually fires with a tuple of
         (set(sharenum), serverid, set(corruptsharenum),
         set(incompatiblesharenum), success) showing all the shares verified
@@ -679,7 +678,7 @@ class Checker(log.PrefixingLogMixin):
         then disconnected and ceased responding, or returned a failure, it is
         still marked with the True flag for 'success'.
         """
-        d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid)
+        d = self._get_buckets(s, self._verifycap.get_storage_index())
 
         def _got_buckets(result):
             bucketdict, serverid, success = result
@@ -710,12 +709,12 @@ class Checker(log.PrefixingLogMixin):
 
         def _err(f):
             f.trap(RemoteException, DeadReferenceError)
-            return (set(), serverid, set(), set(), False)
+            return (set(), s.get_serverid(), set(), set(), False)
 
         d.addCallbacks(_got_buckets, _err)
         return d
 
-    def _check_server_shares(self, serverid, ss):
+    def _check_server_shares(self, s):
         """Return a deferred which eventually fires with a tuple of
         (set(sharenum), serverid, set(), set(), responded) showing all the
         shares claimed to be served by this server. In case the server is
@@ -726,7 +725,7 @@ class Checker(log.PrefixingLogMixin):
         def _curry_empty_corrupted(res):
             buckets, serverid, responded = res
             return (set(buckets), serverid, set(), set(), responded)
-        d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid)
+        d = self._get_buckets(s, self._verifycap.get_storage_index())
         d.addCallback(_curry_empty_corrupted)
         return d
 
@@ -794,10 +793,10 @@ class Checker(log.PrefixingLogMixin):
     def start(self):
         ds = []
         if self._verify:
-            for (serverid, ss) in self._servers:
-                ds.append(self._verify_server_shares(serverid, ss))
+            for s in self._servers:
+                ds.append(self._verify_server_shares(s))
         else:
-            for (serverid, ss) in self._servers:
-                ds.append(self._check_server_shares(serverid, ss))
+            for s in self._servers:
+                ds.append(self._check_server_shares(s))
 
         return deferredutil.gatherResults(ds).addCallback(self._format_results)
index 4816ccdf79b3c8b797d55c20da4ac2d7b1b2415b..f1142e79d0f01c1f15798bdb7b1bb31351ba87b7 100644 (file)
@@ -62,8 +62,9 @@ class ShareFinder:
         # test_dirnode, which creates us with storage_broker=None
         if not self._started:
             si = self.verifycap.storage_index
-            s = self._storage_broker.get_servers_for_index(si)
-            self._servers = iter(s)
+            servers = [(s.get_serverid(), s.get_rref())
+                       for s in self._storage_broker.get_servers_for_psi(si)]
+            self._servers = iter(servers)
             self._started = True
 
     def log(self, *args, **kwargs):
index ed3785b7152e75938ca5bbdded51fe76dbfba5e3..8fc47725ed30445b0ccdaa7749be22808d650489 100644 (file)
@@ -102,7 +102,7 @@ class CiphertextFileNode:
         verifycap = self._verifycap
         storage_index = verifycap.storage_index
         sb = self._storage_broker
-        servers = sb.get_all_servers()
+        servers = sb.get_connected_servers()
         sh = self._secret_holder
 
         c = Checker(verifycap=verifycap, servers=servers,
@@ -160,7 +160,7 @@ class CiphertextFileNode:
     def check(self, monitor, verify=False, add_lease=False):
         verifycap = self._verifycap
         sb = self._storage_broker
-        servers = sb.get_all_servers()
+        servers = sb.get_connected_servers()
         sh = self._secret_holder
 
         v = Checker(verifycap=verifycap, servers=servers,
index dea94c530a16b052897371ba47e42ba5b9bf368f..26be786034fad885dc9b1d6c69b81916f38f6a13 100644 (file)
@@ -53,10 +53,10 @@ class CHKCheckerAndUEBFetcher:
 
     def _get_all_shareholders(self, storage_index):
         dl = []
-        for (peerid, ss) in self._peer_getter(storage_index):
-            d = ss.callRemote("get_buckets", storage_index)
+        for s in self._peer_getter(storage_index):
+            d = s.get_rref().callRemote("get_buckets", storage_index)
             d.addCallbacks(self._got_response, self._got_error,
-                           callbackArgs=(peerid,))
+                           callbackArgs=(s.get_serverid(),))
             dl.append(d)
         return defer.DeferredList(dl)
 
@@ -620,7 +620,7 @@ class Helper(Referenceable):
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
         sb = self._storage_broker
-        c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2)
+        c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2)
         d = c.check()
         def _checked(res):
             if res:
index 5dd257bd761a81e243e7b66711ae1374d8c4970c..a53f12a26aee22c1e1fe1a40ca6e8571e3fdbc22 100644 (file)
@@ -224,7 +224,8 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
                                              num_share_hashes, EXTENSION_SIZE,
                                              None)
         allocated_size = wbp.get_allocated_size()
-        all_peers = storage_broker.get_servers_for_index(storage_index)
+        all_peers = [(s.get_serverid(), s.get_rref())
+                     for s in storage_broker.get_servers_for_psi(storage_index)]
         if not all_peers:
             raise NoServersError("client gave us zero peers")
 
index 48094a94202dfdba5d3a228c2e331036ceba1db9..85c3e079111b6c8a824a0b07ae9805be9d07ee7d 100644 (file)
@@ -352,13 +352,17 @@ class IStorageBucketReader(Interface):
         """
 
 class IStorageBroker(Interface):
-    def get_servers_for_index(peer_selection_index):
+    def get_servers_for_psi(peer_selection_index):
         """
-        @return: list of (peerid, versioned-rref) tuples
+        @return: list of IServer instances
         """
-    def get_all_servers():
+    def get_connected_servers():
         """
-        @return: frozenset of (peerid, versioned-rref) tuples
+        @return: frozenset of connected IServer instances
+        """
+    def get_known_servers():
+        """
+        @return: frozenset of IServer instances
         """
     def get_all_serverids():
         """
index 2d63c87b9ef28ce0e413ffa461a2cfa39ac73eaa..580682b638abc034e368a778af18b8cb3cfbccf7 100644 (file)
@@ -179,7 +179,8 @@ class Publish:
         self._encprivkey = self._node.get_encprivkey()
 
         sb = self._storage_broker
-        full_peerlist = sb.get_servers_for_index(self._storage_index)
+        full_peerlist = [(s.get_serverid(), s.get_rref())
+                         for s in sb.get_servers_for_psi(self._storage_index)]
         self.full_peerlist = full_peerlist # for use later, immutable
         self.bad_peers = set() # peerids who have errbacked/refused requests
 
index 999691fa897c8a1d9db58fb333077449458d72a9..c69e4108952b91b49662c7f61069ee13213f62a4 100644 (file)
@@ -424,7 +424,8 @@ class ServermapUpdater:
         self._queries_completed = 0
 
         sb = self._storage_broker
-        full_peerlist = sb.get_servers_for_index(self._storage_index)
+        full_peerlist = [(s.get_serverid(), s.get_rref())
+                         for s in sb.get_servers_for_psi(self._storage_index)]
         self.full_peerlist = full_peerlist # for use later, immutable
         self.extra_peers = full_peerlist[:] # peers are removed as we use them
         self._good_peers = set() # peers who had some shares
index 7945eac05dfd257eb0652990204df86b71b06f7c..e1b3f553f521c605165a474eecd91240bba79d31 100644 (file)
@@ -34,7 +34,7 @@ from zope.interface import implements, Interface
 from foolscap.api import eventually
 from allmydata.interfaces import IStorageBroker
 from allmydata.util import idlib, log
-from allmydata.util.assertutil import _assert, precondition
+from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.util.hashutil import sha1
 
@@ -66,11 +66,11 @@ class StorageFarmBroker:
         self.tub = tub
         assert permute_peers # False not implemented yet
         self.permute_peers = permute_peers
-        # self.descriptors maps serverid -> IServerDescriptor, and keeps
-        # track of all the storage servers that we've heard about. Each
-        # descriptor manages its own Reconnector, and will give us a
-        # RemoteReference when we ask them for it.
-        self.descriptors = {}
+        # self.servers maps serverid -> IServer, and keeps track of all the
+        # storage servers that we've heard about. Each descriptor manages its
+        # own Reconnector, and will give us a RemoteReference when we ask
+        # them for it.
+        self.servers = {}
         # self.test_servers are statically configured from unit tests
         self.test_servers = {} # serverid -> rref
         self.introducer_client = None
@@ -79,7 +79,7 @@ class StorageFarmBroker:
     def test_add_server(self, serverid, rref):
         self.test_servers[serverid] = rref
     def test_add_descriptor(self, serverid, dsc):
-        self.descriptors[serverid] = dsc
+        self.servers[serverid] = dsc
 
     def use_introducer(self, introducer_client):
         self.introducer_client = ic = introducer_client
@@ -89,16 +89,16 @@ class StorageFarmBroker:
         precondition(isinstance(serverid, str), serverid)
         precondition(len(serverid) == 20, serverid)
         assert ann_d["service-name"] == "storage"
-        old = self.descriptors.get(serverid)
+        old = self.servers.get(serverid)
         if old:
             if old.get_announcement() == ann_d:
                 return # duplicate
             # replacement
-            del self.descriptors[serverid]
+            del self.servers[serverid]
             old.stop_connecting()
             # now we forget about them and start using the new one
-        dsc = NativeStorageClientDescriptor(serverid, ann_d)
-        self.descriptors[serverid] = dsc
+        dsc = NativeStorageServer(serverid, ann_d)
+        self.servers[serverid] = dsc
         dsc.start_connecting(self.tub, self._trigger_connections)
         # the descriptor will manage their own Reconnector, and each time we
         # need servers, we'll ask them if they're connected or not.
@@ -111,48 +111,44 @@ class StorageFarmBroker:
         # connections to only a subset of the servers, which would increase
         # the chances that we'll put shares in weird places (and not update
         # existing shares of mutable files). See #374 for more details.
-        for dsc in self.descriptors.values():
+        for dsc in self.servers.values():
             dsc.try_to_connect()
 
-
-
-    def get_servers_for_index(self, peer_selection_index):
-        # first cut: return a list of (peerid, versioned-rref) tuples
+    def get_servers_for_psi(self, peer_selection_index):
+        # return a list of server objects (IServers)
         assert self.permute_peers == True
-        servers = self.get_all_servers()
-        key = peer_selection_index
-        return sorted(servers, key=lambda x: sha1(key+x[0]).digest())
-
-    def get_all_servers(self):
-        # return a frozenset of (peerid, versioned-rref) tuples
-        servers = {}
-        for serverid,rref in self.test_servers.items():
-            servers[serverid] = rref
-        for serverid,dsc in self.descriptors.items():
-            rref = dsc.get_rref()
-            if rref:
-                servers[serverid] = rref
-        result = frozenset(servers.items())
-        _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids())
-        return result
+        def _permuted(server):
+            seed = server.get_permutation_seed()
+            return sha1(peer_selection_index + seed).digest()
+        return sorted(self.get_connected_servers(), key=_permuted)
 
     def get_all_serverids(self):
         serverids = set()
         serverids.update(self.test_servers.keys())
-        serverids.update(self.descriptors.keys())
+        serverids.update(self.servers.keys())
         return frozenset(serverids)
 
-    def get_all_descriptors(self):
-        return sorted(self.descriptors.values(),
-                      key=lambda dsc: dsc.get_serverid())
+    def get_connected_servers(self):
+        return frozenset([s for s in self.get_known_servers()
+                          if s.get_rref()])
+
+    def get_known_servers(self):
+        servers = []
+        for serverid,rref in self.test_servers.items():
+            s = NativeStorageServer(serverid, {})
+            s.rref = rref
+            servers.append(s)
+        servers.extend(self.servers.values())
+        return sorted(servers, key=lambda s: s.get_serverid())
 
     def get_nickname_for_serverid(self, serverid):
-        if serverid in self.descriptors:
-            return self.descriptors[serverid].get_nickname()
+        if serverid in self.servers:
+            return self.servers[serverid].get_nickname()
         return None
 
 
-class IServerDescriptor(Interface):
+class IServer(Interface):
+    """I live in the client, and represent a single server."""
     def start_connecting(tub, trigger_cb):
         pass
     def get_nickname():
@@ -160,7 +156,7 @@ class IServerDescriptor(Interface):
     def get_rref():
         pass
 
-class NativeStorageClientDescriptor:
+class NativeStorageServer:
     """I hold information about a storage server that we want to connect to.
     If we are connected, I hold the RemoteReference, their host address, and
     the their version information. I remember information about when we were
@@ -176,7 +172,7 @@ class NativeStorageClientDescriptor:
     @ivar rref: the RemoteReference, if connected, otherwise None
     @ivar remote_host: the IAddress, if connected, otherwise None
     """
-    implements(IServerDescriptor)
+    implements(IServer)
 
     VERSION_DEFAULTS = {
         "http://allmydata.org/tahoe/protocols/storage/v1" :
@@ -203,6 +199,8 @@ class NativeStorageClientDescriptor:
 
     def get_serverid(self):
         return self.serverid
+    def get_permutation_seed(self):
+        return self.serverid
 
     def get_nickname(self):
         return self.announcement["nickname"].decode("utf-8")
index e8117bf7b16c3a45516a6776e1954e5b5c85efc4..dfcaa100ec37ff65f56ebfdc14e058714cac7094 100644 (file)
@@ -572,7 +572,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
             if not c.connected_to_introducer():
                 return False
             sb = c.get_storage_broker()
-            if len(sb.get_all_servers()) != self.numclients:
+            if len(sb.get_connected_servers()) != self.numclients:
                 return False
         return True
 
index 554a75f6c7151df47fe7f76040181dab730b77b4..f07c36b4ea1e2ddf2c38712f6c3ef49bd6816257 100644 (file)
@@ -117,13 +117,26 @@ def wrap_storage_server(original):
     wrapper.version = original.remote_get_version()
     return wrapper
 
+class NoNetworkServer:
+    def __init__(self, serverid, rref):
+        self.serverid = serverid
+        self.rref = rref
+    def get_serverid(self):
+        return self.serverid
+    def get_permutation_seed(self):
+        return self.serverid
+    def get_rref(self):
+        return self.rref
+
 class NoNetworkStorageBroker:
     implements(IStorageBroker)
-    def get_servers_for_index(self, key):
-        return sorted(self.client._servers,
-                      key=lambda x: sha1(key+x[0]).digest())
-    def get_all_servers(self):
-        return frozenset(self.client._servers)
+    def get_servers_for_psi(self, peer_selection_index):
+        def _permuted(server):
+            seed = server.get_permutation_seed()
+            return sha1(peer_selection_index + seed).digest()
+        return sorted(self.get_connected_servers(), key=_permuted)
+    def get_connected_servers(self):
+        return self.client._servers
     def get_nickname_for_serverid(self, serverid):
         return None
 
@@ -181,8 +194,10 @@ class NoNetworkGrid(service.MultiService):
         self.basedir = basedir
         fileutil.make_dirs(basedir)
 
-        self.servers_by_number = {}
-        self.servers_by_id = {}
+        self.servers_by_number = {} # maps to StorageServer instance
+        self.wrappers_by_id = {} # maps to wrapped StorageServer instance
+        self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped
+                                # StorageServer
         self.clients = []
 
         for i in range(num_servers):
@@ -234,11 +249,16 @@ class NoNetworkGrid(service.MultiService):
         ss.setServiceParent(middleman)
         serverid = ss.my_nodeid
         self.servers_by_number[i] = ss
-        self.servers_by_id[serverid] = wrap_storage_server(ss)
+        wrapper = wrap_storage_server(ss)
+        self.wrappers_by_id[serverid] = wrapper
+        self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper)
         self.rebuild_serverlist()
 
+    def get_all_serverids(self):
+        return self.proxies_by_id.keys()
+
     def rebuild_serverlist(self):
-        self.all_servers = frozenset(self.servers_by_id.items())
+        self.all_servers = frozenset(self.proxies_by_id.values())
         for c in self.clients:
             c._servers = self.all_servers
 
@@ -249,23 +269,24 @@ class NoNetworkGrid(service.MultiService):
             if ss.my_nodeid == serverid:
                 del self.servers_by_number[i]
                 break
-        del self.servers_by_id[serverid]
+        del self.wrappers_by_id[serverid]
+        del self.proxies_by_id[serverid]
         self.rebuild_serverlist()
 
     def break_server(self, serverid):
         # mark the given server as broken, so it will throw exceptions when
         # asked to hold a share or serve a share
-        self.servers_by_id[serverid].broken = True
+        self.wrappers_by_id[serverid].broken = True
 
     def hang_server(self, serverid):
         # hang the given server
-        ss = self.servers_by_id[serverid]
+        ss = self.wrappers_by_id[serverid]
         assert ss.hung_until is None
         ss.hung_until = defer.Deferred()
 
     def unhang_server(self, serverid):
         # unhang the given server
-        ss = self.servers_by_id[serverid]
+        ss = self.wrappers_by_id[serverid]
         assert ss.hung_until is not None
         ss.hung_until.callback(None)
         ss.hung_until = None
index 7548d7cc8142f43d797498591d106e4e6e80c4db..d1cf86f7f31ae1c9174436831b364be9774586af 100644 (file)
@@ -3,7 +3,7 @@ import simplejson
 from twisted.trial import unittest
 from allmydata import check_results, uri
 from allmydata.web import check_results as web_check_results
-from allmydata.storage_client import StorageFarmBroker, NativeStorageClientDescriptor
+from allmydata.storage_client import StorageFarmBroker, NativeStorageServer
 from allmydata.monitor import Monitor
 from allmydata.test.no_network import GridTestMixin
 from allmydata.immutable.upload import Data
@@ -28,7 +28,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
                       "my-version": "ver",
                       "oldest-supported": "oldest",
                       }
-            dsc = NativeStorageClientDescriptor(peerid, ann_d)
+            dsc = NativeStorageServer(peerid, ann_d)
             sb.test_add_descriptor(peerid, dsc)
         c = FakeClient()
         c.storage_broker = sb
index 7c6a4567e6053a8ae950a54e96fc9a9fa5779772..abf63aa7678aae8911170764240e369e95b43356 100644 (file)
@@ -134,13 +134,12 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
         self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
 
     def _permute(self, sb, key):
-        return [ peerid
-                 for (peerid,rref) in sb.get_servers_for_index(key) ]
+        return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ]
 
     def test_permute(self):
         sb = StorageFarmBroker(None, True)
         for k in ["%d" % i for i in range(5)]:
-            sb.test_add_server(k, None)
+            sb.test_add_server(k, "rref")
 
         self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
         self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
index 5168b37e4a1396c569cea1a511ca852330e275e2..4bcfe28951dda6954fc77fce31903d63176b3b12 100644 (file)
@@ -287,14 +287,14 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
         if not incomplete:
             self.failUnlessEqual(sorted(d["servers-responding"]),
-                                 sorted(self.g.servers_by_id.keys()),
+                                 sorted(self.g.get_all_serverids()),
                                  where)
             self.failUnless("sharemap" in d, str((where, d)))
             all_serverids = set()
             for (shareid, serverids) in d["sharemap"].items():
                 all_serverids.update(serverids)
             self.failUnlessEqual(sorted(all_serverids),
-                                 sorted(self.g.servers_by_id.keys()),
+                                 sorted(self.g.get_all_serverids()),
                                  where)
 
         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
@@ -545,7 +545,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
         if not incomplete:
             self.failUnlessEqual(sorted(r["servers-responding"]),
                                  sorted([idlib.nodeid_b2a(sid)
-                                         for sid in self.g.servers_by_id]),
+                                         for sid in self.g.get_all_serverids()]),
                                  where)
             self.failUnless("sharemap" in r, where)
             all_serverids = set()
@@ -553,7 +553,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
                 all_serverids.update(serverids_s)
             self.failUnlessEqual(sorted(all_serverids),
                                  sorted([idlib.nodeid_b2a(sid)
-                                         for sid in self.g.servers_by_id]),
+                                         for sid in self.g.get_all_serverids()]),
                                  where)
         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
index 3a692209972963c87dd69548f71c9d4e248dd63a..ab2e98bc0665345b43072e74011323cb6a2114e5 100644 (file)
@@ -596,7 +596,8 @@ class DownloadTest(_Base, unittest.TestCase):
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1167,7 +1168,8 @@ class DownloadV2(_Base, unittest.TestCase):
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1186,7 +1188,8 @@ class DownloadV2(_Base, unittest.TestCase):
         self.set_up_grid()
         self.c0 = self.g.clients[0]
 
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
index e63cba329ceff3ddd0c1e0e7076e9e015139dd97..abed967a5f45b6a15a3faa2451c549810a3e77f3 100644 (file)
@@ -101,7 +101,8 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
 
         self.c0 = self.g.clients[0]
         nm = self.c0.nodemaker
-        self.servers = sorted([(id, ss) for (id, ss) in nm.storage_broker.get_all_servers()])
+        self.servers = sorted([(s.get_serverid(), s.get_rref())
+                               for s in nm.storage_broker.get_connected_servers()])
         self.servers = self.servers[5:] + self.servers[:5]
 
         if mutable:
index edac3e699bdf949f6e03865056ec6dd9146ddde0..1ca4383849380c5bd9b9de05195d73636a50481d 100644 (file)
@@ -95,12 +95,23 @@ class TestShareFinder(unittest.TestCase):
                     self.s.hungry()
                 eventually(_give_buckets_and_hunger_again)
                 return d
+        class MockIServer(object):
+            def __init__(self, serverid, rref):
+                self.serverid = serverid
+                self.rref = rref
+            def get_serverid(self):
+                return self.serverid
+            def get_rref(self):
+                return self.rref
 
         mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
         mockserver2 = MockServer({})
         mockserver3 = MockServer({3: mock.Mock()})
         mockstoragebroker = mock.Mock()
-        mockstoragebroker.get_servers_for_index.return_value = [ ('ms1', mockserver1), ('ms2', mockserver2), ('ms3', mockserver3), ]
+        servers = [ MockIServer("ms1", mockserver1),
+                    MockIServer("ms2", mockserver2),
+                    MockIServer("ms3", mockserver3), ]
+        mockstoragebroker.get_servers_for_psi.return_value = servers
         mockdownloadstatus = mock.Mock()
         mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
 
index e4e6eb7faa192876092966c878f2546f60bf7890..0f49dced98f975e3d5679480e16da90ebeb06a43 100644 (file)
@@ -1910,11 +1910,9 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         d.addCallback(_got_key)
         def _break_peer0(res):
             si = self._storage_index
-            peerlist = nm.storage_broker.get_servers_for_index(si)
-            peerid0, connection0 = peerlist[0]
-            peerid1, connection1 = peerlist[1]
-            connection0.broken = True
-            self.connection1 = connection1
+            servers = nm.storage_broker.get_servers_for_psi(si)
+            self.g.break_server(servers[0].get_serverid())
+            self.server1 = servers[1]
         d.addCallback(_break_peer0)
         # now "create" the file, using the pre-established key, and let the
         # initial publish finally happen
@@ -1925,7 +1923,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
             # now break the second peer
             def _break_peer1(res):
-                self.connection1.broken = True
+                self.g.break_server(self.server1.get_serverid())
             d.addCallback(_break_peer1)
             d.addCallback(lambda res: n.overwrite("contents 2"))
             # that ought to work too
@@ -1956,7 +1954,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         nm = self.g.clients[0].nodemaker
         sb = nm.storage_broker
 
-        peerids = [serverid for (serverid,ss) in sb.get_all_servers()]
+        peerids = [s.get_serverid() for s in sb.get_connected_servers()]
         self.g.break_server(peerids[0])
 
         d = nm.create_mutable_file("contents 1")
@@ -1980,8 +1978,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         self.basedir = "mutable/Problems/test_publish_all_servers_bad"
         self.set_up_grid()
         nm = self.g.clients[0].nodemaker
-        for (serverid,ss) in nm.storage_broker.get_all_servers():
-            ss.broken = True
+        for s in nm.storage_broker.get_connected_servers():
+            s.get_rref().broken = True
 
         d = self.shouldFail(NotEnoughServersError,
                             "test_publish_all_servers_bad",
@@ -2033,8 +2031,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
             #  1. notice which server gets a read() call first
             #  2. tell that server to start throwing errors
             killer = FirstServerGetsKilled()
-            for (serverid,ss) in nm.storage_broker.get_all_servers():
-                ss.post_call_notifier = killer.notify
+            for s in nm.storage_broker.get_connected_servers():
+                s.get_rref().post_call_notifier = killer.notify
         d.addCallback(_created)
 
         # now we update a servermap from a new node (which doesn't have the
@@ -2059,8 +2057,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
             self.uri = n.get_uri()
             self.n2 = nm.create_from_cap(self.uri)
             deleter = FirstServerGetsDeleted()
-            for (serverid,ss) in nm.storage_broker.get_all_servers():
-                ss.post_call_notifier = deleter.notify
+            for s in nm.storage_broker.get_connected_servers():
+                s.get_rref().post_call_notifier = deleter.notify
         d.addCallback(_created)
         d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
         return d
index bf6af0946054573b63b891febd5c5aeb2f5a22ad..e37f663833f32debc37a2815cab7be30b74d0d34 100644 (file)
@@ -65,7 +65,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
                 all_peerids = c.get_storage_broker().get_all_serverids()
                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
                 sb = c.storage_broker
-                permuted_peers = sb.get_servers_for_index("a")
+                permuted_peers = sb.get_servers_for_psi("a")
                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
 
         d.addCallback(_check)
@@ -101,7 +101,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
                 all_peerids = c.get_storage_broker().get_all_serverids()
                 self.failUnlessEqual(len(all_peerids), self.numclients)
                 sb = c.storage_broker
-                permuted_peers = sb.get_servers_for_index("a")
+                permuted_peers = sb.get_servers_for_psi("a")
                 self.failUnlessEqual(len(permuted_peers), self.numclients)
         d.addCallback(_check_connections)
 
index 022185e330f53f38db4d17aa52775165aa6d971f..11259324473228a2ea8ee33f905c2eaccc5c5226 100644 (file)
@@ -1730,12 +1730,8 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
         d.addCallback(lambda ign:
             self._add_server(server_number=2))
         def _break_server_2(ign):
-            server = self.g.servers_by_number[2].my_nodeid
-            # We have to break the server in servers_by_id,
-            # because the one in servers_by_number isn't wrapped,
-            # and doesn't look at its broken attribute when answering
-            # queries.
-            self.g.servers_by_id[server].broken = True
+            serverid = self.g.servers_by_number[2].my_nodeid
+            self.g.break_server(serverid)
         d.addCallback(_break_server_2)
         d.addCallback(lambda ign:
             self._add_server(server_number=3, readonly=True))
index 96ce17bf4490dad7144633410a7aa5ac2cefe04b..44d050623073dba2b7d8631a457bf30812327478 100644 (file)
@@ -139,9 +139,9 @@ class ResultsBase:
 
         # this table is sorted by permuted order
         sb = c.get_storage_broker()
-        permuted_peer_ids = [peerid
-                             for (peerid, rref)
-                             in sb.get_servers_for_index(cr.get_storage_index())]
+        permuted_peer_ids = [s.get_serverid()
+                             for s
+                             in sb.get_servers_for_psi(cr.get_storage_index())]
 
         num_shares_left = sum([len(shares) for shares in servers.values()])
         servermap = []
index 3af15d9273e83d83df292eb6e37748473f16d550..2f1f6d4fbf7d3a4ef63da12a312be168b8517a47 100644 (file)
@@ -247,18 +247,18 @@ class Root(rend.Page):
 
     def data_connected_storage_servers(self, ctx, data):
         sb = self.client.get_storage_broker()
-        return len(sb.get_all_servers())
+        return len(sb.get_connected_servers())
 
     def data_services(self, ctx, data):
         sb = self.client.get_storage_broker()
-        return sb.get_all_descriptors()
+        return sb.get_known_servers()
 
-    def render_service_row(self, ctx, descriptor):
-        nodeid = descriptor.get_serverid()
+    def render_service_row(self, ctx, server):
+        nodeid = server.get_serverid()
 
         ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid))
-        ctx.fillSlots("nickname", descriptor.get_nickname())
-        rhost = descriptor.get_remote_host()
+        ctx.fillSlots("nickname", server.get_nickname())
+        rhost = server.get_remote_host()
         if rhost:
             if nodeid == self.client.nodeid:
                 rhost_s = "(loopback)"
@@ -267,12 +267,12 @@ class Root(rend.Page):
             else:
                 rhost_s = str(rhost)
             connected = "Yes: to " + rhost_s
-            since = descriptor.get_last_connect_time()
+            since = server.get_last_connect_time()
         else:
             connected = "No"
-            since = descriptor.get_last_loss_time()
-        announced = descriptor.get_announcement_time()
-        announcement = descriptor.get_announcement()
+            since = server.get_last_loss_time()
+        announced = server.get_announcement_time()
+        announcement = server.get_announcement()
         version = announcement["my-version"]
         service_name = announcement["service-name"]