]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Refactor StorageFarmBroker handling of servers
authorBrian Warner <warner@lothar.com>
Mon, 21 Feb 2011 01:58:04 +0000 (17:58 -0800)
committerBrian Warner <warner@lothar.com>
Mon, 21 Feb 2011 01:58:04 +0000 (17:58 -0800)
Pass around IServer instance instead of (peerid, rref) tuple. Replace
"descriptor" with "server". Other replacements:

 get_all_servers -> get_connected_servers/get_known_servers
 get_servers_for_index -> get_servers_for_psi (now returns IServers)

This change still needs to be pushed further down: lots of code is now
getting the IServer and then distributing (peerid, rref) internally.
Instead, it ought to distribute the IServer internally and delay
extracting a serverid or rref until the last moment.

no_network.py was updated to retain parallelism.

24 files changed:
src/allmydata/client.py
src/allmydata/control.py
src/allmydata/immutable/checker.py
src/allmydata/immutable/downloader/finder.py
src/allmydata/immutable/filenode.py
src/allmydata/immutable/offloaded.py
src/allmydata/immutable/upload.py
src/allmydata/interfaces.py
src/allmydata/mutable/publish.py
src/allmydata/mutable/servermap.py
src/allmydata/storage_client.py
src/allmydata/test/common.py
src/allmydata/test/no_network.py
src/allmydata/test/test_checker.py
src/allmydata/test/test_client.py
src/allmydata/test/test_deepcheck.py
src/allmydata/test/test_download.py
src/allmydata/test/test_hung_server.py
src/allmydata/test/test_immutable.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_system.py
src/allmydata/test/test_upload.py
src/allmydata/web/check_results.py
src/allmydata/web/root.py

index fa515d41a0895efb3eb67256320fe24da9025318..fb7e0c890b4a51983d6707b6b6df225c709b363b 100644 (file)
@@ -468,7 +468,7 @@ class Client(node.Node, pollmixin.PollMixin):
         temporary test network and need to know when it is safe to proceed
         with an upload or download."""
         def _check():
         temporary test network and need to know when it is safe to proceed
         with an upload or download."""
         def _check():
-            return len(self.storage_broker.get_all_servers()) >= num_clients
+            return len(self.storage_broker.get_connected_servers()) >= num_clients
         d = self.poll(_check, 0.5)
         d.addCallback(lambda res: None)
         return d
         d = self.poll(_check, 0.5)
         d.addCallback(lambda res: None)
         return d
index cb0c84f7238f3a40b60a480fbadb816283fe6ac9..045a34e5e0354e725e0ad727e043609013189f76 100644 (file)
@@ -91,7 +91,7 @@ class ControlServer(Referenceable, service.Service):
         # 300ms.
         results = {}
         sb = self.parent.get_storage_broker()
         # 300ms.
         results = {}
         sb = self.parent.get_storage_broker()
-        everyone = sb.get_all_servers()
+        everyone = sb.get_connected_servers()
         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
         everyone = list(everyone) * num_pings
         d = self._do_one_ping(None, everyone, results)
         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
         everyone = list(everyone) * num_pings
         d = self._do_one_ping(None, everyone, results)
@@ -99,7 +99,9 @@ class ControlServer(Referenceable, service.Service):
     def _do_one_ping(self, res, everyone_left, results):
         if not everyone_left:
             return results
     def _do_one_ping(self, res, everyone_left, results):
         if not everyone_left:
             return results
-        peerid, connection = everyone_left.pop(0)
+        server = everyone_left.pop(0)
+        peerid = server.get_serverid()
+        connection = server.get_rref()
         start = time.time()
         d = connection.callRemote("get_buckets", "\x00"*16)
         def _done(ignored):
         start = time.time()
         d = connection.callRemote("get_buckets", "\x00"*16)
         def _done(ignored):
index cd5c5568dae4cdb671042472cb598b23a20e29fb..f3500291bff00b7b62a86b6941a53850ca19374d 100644 (file)
@@ -463,9 +463,6 @@ class Checker(log.PrefixingLogMixin):
     def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
                  monitor):
         assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
     def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
                  monitor):
         assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
-        assert precondition(isinstance(servers, (set, frozenset)), servers)
-        for (serverid, serverrref) in servers:
-            assert precondition(isinstance(serverid, str))
 
         prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
 
         prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
@@ -489,7 +486,7 @@ class Checker(log.PrefixingLogMixin):
     def _get_cancel_secret(self, peerid):
         return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
 
     def _get_cancel_secret(self, peerid):
         return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
 
-    def _get_buckets(self, server, storageindex, serverid):
+    def _get_buckets(self, s, storageindex):
         """Return a deferred that eventually fires with ({sharenum: bucket},
         serverid, success). In case the server is disconnected or returns a
         Failure then it fires with ({}, serverid, False) (A server
         """Return a deferred that eventually fires with ({sharenum: bucket},
         serverid, success). In case the server is disconnected or returns a
         Failure then it fires with ({}, serverid, False) (A server
@@ -498,14 +495,16 @@ class Checker(log.PrefixingLogMixin):
         that we want to track and report whether or not each server
         responded.)"""
 
         that we want to track and report whether or not each server
         responded.)"""
 
+        rref = s.get_rref()
+        serverid = s.get_serverid()
         if self._add_lease:
             renew_secret = self._get_renewal_secret(serverid)
             cancel_secret = self._get_cancel_secret(serverid)
         if self._add_lease:
             renew_secret = self._get_renewal_secret(serverid)
             cancel_secret = self._get_cancel_secret(serverid)
-            d2 = server.callRemote("add_lease", storageindex,
-                                   renew_secret, cancel_secret)
+            d2 = rref.callRemote("add_lease", storageindex,
+                                 renew_secret, cancel_secret)
             d2.addErrback(self._add_lease_failed, serverid, storageindex)
 
             d2.addErrback(self._add_lease_failed, serverid, storageindex)
 
-        d = server.callRemote("get_buckets", storageindex)
+        d = rref.callRemote("get_buckets", storageindex)
         def _wrap_results(res):
             return (res, serverid, True)
 
         def _wrap_results(res):
             return (res, serverid, True)
 
@@ -656,7 +655,7 @@ class Checker(log.PrefixingLogMixin):
 
         return d
 
 
         return d
 
-    def _verify_server_shares(self, serverid, ss):
+    def _verify_server_shares(self, s):
         """ Return a deferred which eventually fires with a tuple of
         (set(sharenum), serverid, set(corruptsharenum),
         set(incompatiblesharenum), success) showing all the shares verified
         """ Return a deferred which eventually fires with a tuple of
         (set(sharenum), serverid, set(corruptsharenum),
         set(incompatiblesharenum), success) showing all the shares verified
@@ -679,7 +678,7 @@ class Checker(log.PrefixingLogMixin):
         then disconnected and ceased responding, or returned a failure, it is
         still marked with the True flag for 'success'.
         """
         then disconnected and ceased responding, or returned a failure, it is
         still marked with the True flag for 'success'.
         """
-        d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid)
+        d = self._get_buckets(s, self._verifycap.get_storage_index())
 
         def _got_buckets(result):
             bucketdict, serverid, success = result
 
         def _got_buckets(result):
             bucketdict, serverid, success = result
@@ -710,12 +709,12 @@ class Checker(log.PrefixingLogMixin):
 
         def _err(f):
             f.trap(RemoteException, DeadReferenceError)
 
         def _err(f):
             f.trap(RemoteException, DeadReferenceError)
-            return (set(), serverid, set(), set(), False)
+            return (set(), s.get_serverid(), set(), set(), False)
 
         d.addCallbacks(_got_buckets, _err)
         return d
 
 
         d.addCallbacks(_got_buckets, _err)
         return d
 
-    def _check_server_shares(self, serverid, ss):
+    def _check_server_shares(self, s):
         """Return a deferred which eventually fires with a tuple of
         (set(sharenum), serverid, set(), set(), responded) showing all the
         shares claimed to be served by this server. In case the server is
         """Return a deferred which eventually fires with a tuple of
         (set(sharenum), serverid, set(), set(), responded) showing all the
         shares claimed to be served by this server. In case the server is
@@ -726,7 +725,7 @@ class Checker(log.PrefixingLogMixin):
         def _curry_empty_corrupted(res):
             buckets, serverid, responded = res
             return (set(buckets), serverid, set(), set(), responded)
         def _curry_empty_corrupted(res):
             buckets, serverid, responded = res
             return (set(buckets), serverid, set(), set(), responded)
-        d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid)
+        d = self._get_buckets(s, self._verifycap.get_storage_index())
         d.addCallback(_curry_empty_corrupted)
         return d
 
         d.addCallback(_curry_empty_corrupted)
         return d
 
@@ -794,10 +793,10 @@ class Checker(log.PrefixingLogMixin):
     def start(self):
         ds = []
         if self._verify:
     def start(self):
         ds = []
         if self._verify:
-            for (serverid, ss) in self._servers:
-                ds.append(self._verify_server_shares(serverid, ss))
+            for s in self._servers:
+                ds.append(self._verify_server_shares(s))
         else:
         else:
-            for (serverid, ss) in self._servers:
-                ds.append(self._check_server_shares(serverid, ss))
+            for s in self._servers:
+                ds.append(self._check_server_shares(s))
 
         return deferredutil.gatherResults(ds).addCallback(self._format_results)
 
         return deferredutil.gatherResults(ds).addCallback(self._format_results)
index 4816ccdf79b3c8b797d55c20da4ac2d7b1b2415b..f1142e79d0f01c1f15798bdb7b1bb31351ba87b7 100644 (file)
@@ -62,8 +62,9 @@ class ShareFinder:
         # test_dirnode, which creates us with storage_broker=None
         if not self._started:
             si = self.verifycap.storage_index
         # test_dirnode, which creates us with storage_broker=None
         if not self._started:
             si = self.verifycap.storage_index
-            s = self._storage_broker.get_servers_for_index(si)
-            self._servers = iter(s)
+            servers = [(s.get_serverid(), s.get_rref())
+                       for s in self._storage_broker.get_servers_for_psi(si)]
+            self._servers = iter(servers)
             self._started = True
 
     def log(self, *args, **kwargs):
             self._started = True
 
     def log(self, *args, **kwargs):
index ed3785b7152e75938ca5bbdded51fe76dbfba5e3..8fc47725ed30445b0ccdaa7749be22808d650489 100644 (file)
@@ -102,7 +102,7 @@ class CiphertextFileNode:
         verifycap = self._verifycap
         storage_index = verifycap.storage_index
         sb = self._storage_broker
         verifycap = self._verifycap
         storage_index = verifycap.storage_index
         sb = self._storage_broker
-        servers = sb.get_all_servers()
+        servers = sb.get_connected_servers()
         sh = self._secret_holder
 
         c = Checker(verifycap=verifycap, servers=servers,
         sh = self._secret_holder
 
         c = Checker(verifycap=verifycap, servers=servers,
@@ -160,7 +160,7 @@ class CiphertextFileNode:
     def check(self, monitor, verify=False, add_lease=False):
         verifycap = self._verifycap
         sb = self._storage_broker
     def check(self, monitor, verify=False, add_lease=False):
         verifycap = self._verifycap
         sb = self._storage_broker
-        servers = sb.get_all_servers()
+        servers = sb.get_connected_servers()
         sh = self._secret_holder
 
         v = Checker(verifycap=verifycap, servers=servers,
         sh = self._secret_holder
 
         v = Checker(verifycap=verifycap, servers=servers,
index dea94c530a16b052897371ba47e42ba5b9bf368f..26be786034fad885dc9b1d6c69b81916f38f6a13 100644 (file)
@@ -53,10 +53,10 @@ class CHKCheckerAndUEBFetcher:
 
     def _get_all_shareholders(self, storage_index):
         dl = []
 
     def _get_all_shareholders(self, storage_index):
         dl = []
-        for (peerid, ss) in self._peer_getter(storage_index):
-            d = ss.callRemote("get_buckets", storage_index)
+        for s in self._peer_getter(storage_index):
+            d = s.get_rref().callRemote("get_buckets", storage_index)
             d.addCallbacks(self._got_response, self._got_error,
             d.addCallbacks(self._got_response, self._got_error,
-                           callbackArgs=(peerid,))
+                           callbackArgs=(s.get_serverid(),))
             dl.append(d)
         return defer.DeferredList(dl)
 
             dl.append(d)
         return defer.DeferredList(dl)
 
@@ -620,7 +620,7 @@ class Helper(Referenceable):
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
         sb = self._storage_broker
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
         sb = self._storage_broker
-        c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2)
+        c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2)
         d = c.check()
         def _checked(res):
             if res:
         d = c.check()
         def _checked(res):
             if res:
index 5dd257bd761a81e243e7b66711ae1374d8c4970c..a53f12a26aee22c1e1fe1a40ca6e8571e3fdbc22 100644 (file)
@@ -224,7 +224,8 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
                                              num_share_hashes, EXTENSION_SIZE,
                                              None)
         allocated_size = wbp.get_allocated_size()
                                              num_share_hashes, EXTENSION_SIZE,
                                              None)
         allocated_size = wbp.get_allocated_size()
-        all_peers = storage_broker.get_servers_for_index(storage_index)
+        all_peers = [(s.get_serverid(), s.get_rref())
+                     for s in storage_broker.get_servers_for_psi(storage_index)]
         if not all_peers:
             raise NoServersError("client gave us zero peers")
 
         if not all_peers:
             raise NoServersError("client gave us zero peers")
 
index 48094a94202dfdba5d3a228c2e331036ceba1db9..85c3e079111b6c8a824a0b07ae9805be9d07ee7d 100644 (file)
@@ -352,13 +352,17 @@ class IStorageBucketReader(Interface):
         """
 
 class IStorageBroker(Interface):
         """
 
 class IStorageBroker(Interface):
-    def get_servers_for_index(peer_selection_index):
+    def get_servers_for_psi(peer_selection_index):
         """
         """
-        @return: list of (peerid, versioned-rref) tuples
+        @return: list of IServer instances
         """
         """
-    def get_all_servers():
+    def get_connected_servers():
         """
         """
-        @return: frozenset of (peerid, versioned-rref) tuples
+        @return: frozenset of connected IServer instances
+        """
+    def get_known_servers():
+        """
+        @return: frozenset of IServer instances
         """
     def get_all_serverids():
         """
         """
     def get_all_serverids():
         """
index 2d63c87b9ef28ce0e413ffa461a2cfa39ac73eaa..580682b638abc034e368a778af18b8cb3cfbccf7 100644 (file)
@@ -179,7 +179,8 @@ class Publish:
         self._encprivkey = self._node.get_encprivkey()
 
         sb = self._storage_broker
         self._encprivkey = self._node.get_encprivkey()
 
         sb = self._storage_broker
-        full_peerlist = sb.get_servers_for_index(self._storage_index)
+        full_peerlist = [(s.get_serverid(), s.get_rref())
+                         for s in sb.get_servers_for_psi(self._storage_index)]
         self.full_peerlist = full_peerlist # for use later, immutable
         self.bad_peers = set() # peerids who have errbacked/refused requests
 
         self.full_peerlist = full_peerlist # for use later, immutable
         self.bad_peers = set() # peerids who have errbacked/refused requests
 
index 999691fa897c8a1d9db58fb333077449458d72a9..c69e4108952b91b49662c7f61069ee13213f62a4 100644 (file)
@@ -424,7 +424,8 @@ class ServermapUpdater:
         self._queries_completed = 0
 
         sb = self._storage_broker
         self._queries_completed = 0
 
         sb = self._storage_broker
-        full_peerlist = sb.get_servers_for_index(self._storage_index)
+        full_peerlist = [(s.get_serverid(), s.get_rref())
+                         for s in sb.get_servers_for_psi(self._storage_index)]
         self.full_peerlist = full_peerlist # for use later, immutable
         self.extra_peers = full_peerlist[:] # peers are removed as we use them
         self._good_peers = set() # peers who had some shares
         self.full_peerlist = full_peerlist # for use later, immutable
         self.extra_peers = full_peerlist[:] # peers are removed as we use them
         self._good_peers = set() # peers who had some shares
index 7945eac05dfd257eb0652990204df86b71b06f7c..e1b3f553f521c605165a474eecd91240bba79d31 100644 (file)
@@ -34,7 +34,7 @@ from zope.interface import implements, Interface
 from foolscap.api import eventually
 from allmydata.interfaces import IStorageBroker
 from allmydata.util import idlib, log
 from foolscap.api import eventually
 from allmydata.interfaces import IStorageBroker
 from allmydata.util import idlib, log
-from allmydata.util.assertutil import _assert, precondition
+from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.util.hashutil import sha1
 
 from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.util.hashutil import sha1
 
@@ -66,11 +66,11 @@ class StorageFarmBroker:
         self.tub = tub
         assert permute_peers # False not implemented yet
         self.permute_peers = permute_peers
         self.tub = tub
         assert permute_peers # False not implemented yet
         self.permute_peers = permute_peers
-        # self.descriptors maps serverid -> IServerDescriptor, and keeps
-        # track of all the storage servers that we've heard about. Each
-        # descriptor manages its own Reconnector, and will give us a
-        # RemoteReference when we ask them for it.
-        self.descriptors = {}
+        # self.servers maps serverid -> IServer, and keeps track of all the
+        # storage servers that we've heard about. Each descriptor manages its
+        # own Reconnector, and will give us a RemoteReference when we ask
+        # them for it.
+        self.servers = {}
         # self.test_servers are statically configured from unit tests
         self.test_servers = {} # serverid -> rref
         self.introducer_client = None
         # self.test_servers are statically configured from unit tests
         self.test_servers = {} # serverid -> rref
         self.introducer_client = None
@@ -79,7 +79,7 @@ class StorageFarmBroker:
     def test_add_server(self, serverid, rref):
         self.test_servers[serverid] = rref
     def test_add_descriptor(self, serverid, dsc):
     def test_add_server(self, serverid, rref):
         self.test_servers[serverid] = rref
     def test_add_descriptor(self, serverid, dsc):
-        self.descriptors[serverid] = dsc
+        self.servers[serverid] = dsc
 
     def use_introducer(self, introducer_client):
         self.introducer_client = ic = introducer_client
 
     def use_introducer(self, introducer_client):
         self.introducer_client = ic = introducer_client
@@ -89,16 +89,16 @@ class StorageFarmBroker:
         precondition(isinstance(serverid, str), serverid)
         precondition(len(serverid) == 20, serverid)
         assert ann_d["service-name"] == "storage"
         precondition(isinstance(serverid, str), serverid)
         precondition(len(serverid) == 20, serverid)
         assert ann_d["service-name"] == "storage"
-        old = self.descriptors.get(serverid)
+        old = self.servers.get(serverid)
         if old:
             if old.get_announcement() == ann_d:
                 return # duplicate
             # replacement
         if old:
             if old.get_announcement() == ann_d:
                 return # duplicate
             # replacement
-            del self.descriptors[serverid]
+            del self.servers[serverid]
             old.stop_connecting()
             # now we forget about them and start using the new one
             old.stop_connecting()
             # now we forget about them and start using the new one
-        dsc = NativeStorageClientDescriptor(serverid, ann_d)
-        self.descriptors[serverid] = dsc
+        dsc = NativeStorageServer(serverid, ann_d)
+        self.servers[serverid] = dsc
         dsc.start_connecting(self.tub, self._trigger_connections)
         # the descriptor will manage their own Reconnector, and each time we
         # need servers, we'll ask them if they're connected or not.
         dsc.start_connecting(self.tub, self._trigger_connections)
         # the descriptor will manage their own Reconnector, and each time we
         # need servers, we'll ask them if they're connected or not.
@@ -111,48 +111,44 @@ class StorageFarmBroker:
         # connections to only a subset of the servers, which would increase
         # the chances that we'll put shares in weird places (and not update
         # existing shares of mutable files). See #374 for more details.
         # connections to only a subset of the servers, which would increase
         # the chances that we'll put shares in weird places (and not update
         # existing shares of mutable files). See #374 for more details.
-        for dsc in self.descriptors.values():
+        for dsc in self.servers.values():
             dsc.try_to_connect()
 
             dsc.try_to_connect()
 
-
-
-    def get_servers_for_index(self, peer_selection_index):
-        # first cut: return a list of (peerid, versioned-rref) tuples
+    def get_servers_for_psi(self, peer_selection_index):
+        # return a list of server objects (IServers)
         assert self.permute_peers == True
         assert self.permute_peers == True
-        servers = self.get_all_servers()
-        key = peer_selection_index
-        return sorted(servers, key=lambda x: sha1(key+x[0]).digest())
-
-    def get_all_servers(self):
-        # return a frozenset of (peerid, versioned-rref) tuples
-        servers = {}
-        for serverid,rref in self.test_servers.items():
-            servers[serverid] = rref
-        for serverid,dsc in self.descriptors.items():
-            rref = dsc.get_rref()
-            if rref:
-                servers[serverid] = rref
-        result = frozenset(servers.items())
-        _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids())
-        return result
+        def _permuted(server):
+            seed = server.get_permutation_seed()
+            return sha1(peer_selection_index + seed).digest()
+        return sorted(self.get_connected_servers(), key=_permuted)
 
     def get_all_serverids(self):
         serverids = set()
         serverids.update(self.test_servers.keys())
 
     def get_all_serverids(self):
         serverids = set()
         serverids.update(self.test_servers.keys())
-        serverids.update(self.descriptors.keys())
+        serverids.update(self.servers.keys())
         return frozenset(serverids)
 
         return frozenset(serverids)
 
-    def get_all_descriptors(self):
-        return sorted(self.descriptors.values(),
-                      key=lambda dsc: dsc.get_serverid())
+    def get_connected_servers(self):
+        return frozenset([s for s in self.get_known_servers()
+                          if s.get_rref()])
+
+    def get_known_servers(self):
+        servers = []
+        for serverid,rref in self.test_servers.items():
+            s = NativeStorageServer(serverid, {})
+            s.rref = rref
+            servers.append(s)
+        servers.extend(self.servers.values())
+        return sorted(servers, key=lambda s: s.get_serverid())
 
     def get_nickname_for_serverid(self, serverid):
 
     def get_nickname_for_serverid(self, serverid):
-        if serverid in self.descriptors:
-            return self.descriptors[serverid].get_nickname()
+        if serverid in self.servers:
+            return self.servers[serverid].get_nickname()
         return None
 
 
         return None
 
 
-class IServerDescriptor(Interface):
+class IServer(Interface):
+    """I live in the client, and represent a single server."""
     def start_connecting(tub, trigger_cb):
         pass
     def get_nickname():
     def start_connecting(tub, trigger_cb):
         pass
     def get_nickname():
@@ -160,7 +156,7 @@ class IServerDescriptor(Interface):
     def get_rref():
         pass
 
     def get_rref():
         pass
 
-class NativeStorageClientDescriptor:
+class NativeStorageServer:
     """I hold information about a storage server that we want to connect to.
     If we are connected, I hold the RemoteReference, their host address, and
     the their version information. I remember information about when we were
     """I hold information about a storage server that we want to connect to.
     If we are connected, I hold the RemoteReference, their host address, and
     the their version information. I remember information about when we were
@@ -176,7 +172,7 @@ class NativeStorageClientDescriptor:
     @ivar rref: the RemoteReference, if connected, otherwise None
     @ivar remote_host: the IAddress, if connected, otherwise None
     """
     @ivar rref: the RemoteReference, if connected, otherwise None
     @ivar remote_host: the IAddress, if connected, otherwise None
     """
-    implements(IServerDescriptor)
+    implements(IServer)
 
     VERSION_DEFAULTS = {
         "http://allmydata.org/tahoe/protocols/storage/v1" :
 
     VERSION_DEFAULTS = {
         "http://allmydata.org/tahoe/protocols/storage/v1" :
@@ -203,6 +199,8 @@ class NativeStorageClientDescriptor:
 
     def get_serverid(self):
         return self.serverid
 
     def get_serverid(self):
         return self.serverid
+    def get_permutation_seed(self):
+        return self.serverid
 
     def get_nickname(self):
         return self.announcement["nickname"].decode("utf-8")
 
     def get_nickname(self):
         return self.announcement["nickname"].decode("utf-8")
index e8117bf7b16c3a45516a6776e1954e5b5c85efc4..dfcaa100ec37ff65f56ebfdc14e058714cac7094 100644 (file)
@@ -572,7 +572,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
             if not c.connected_to_introducer():
                 return False
             sb = c.get_storage_broker()
             if not c.connected_to_introducer():
                 return False
             sb = c.get_storage_broker()
-            if len(sb.get_all_servers()) != self.numclients:
+            if len(sb.get_connected_servers()) != self.numclients:
                 return False
         return True
 
                 return False
         return True
 
index 554a75f6c7151df47fe7f76040181dab730b77b4..f07c36b4ea1e2ddf2c38712f6c3ef49bd6816257 100644 (file)
@@ -117,13 +117,26 @@ def wrap_storage_server(original):
     wrapper.version = original.remote_get_version()
     return wrapper
 
     wrapper.version = original.remote_get_version()
     return wrapper
 
+class NoNetworkServer:
+    def __init__(self, serverid, rref):
+        self.serverid = serverid
+        self.rref = rref
+    def get_serverid(self):
+        return self.serverid
+    def get_permutation_seed(self):
+        return self.serverid
+    def get_rref(self):
+        return self.rref
+
 class NoNetworkStorageBroker:
     implements(IStorageBroker)
 class NoNetworkStorageBroker:
     implements(IStorageBroker)
-    def get_servers_for_index(self, key):
-        return sorted(self.client._servers,
-                      key=lambda x: sha1(key+x[0]).digest())
-    def get_all_servers(self):
-        return frozenset(self.client._servers)
+    def get_servers_for_psi(self, peer_selection_index):
+        def _permuted(server):
+            seed = server.get_permutation_seed()
+            return sha1(peer_selection_index + seed).digest()
+        return sorted(self.get_connected_servers(), key=_permuted)
+    def get_connected_servers(self):
+        return self.client._servers
     def get_nickname_for_serverid(self, serverid):
         return None
 
     def get_nickname_for_serverid(self, serverid):
         return None
 
@@ -181,8 +194,10 @@ class NoNetworkGrid(service.MultiService):
         self.basedir = basedir
         fileutil.make_dirs(basedir)
 
         self.basedir = basedir
         fileutil.make_dirs(basedir)
 
-        self.servers_by_number = {}
-        self.servers_by_id = {}
+        self.servers_by_number = {} # maps to StorageServer instance
+        self.wrappers_by_id = {} # maps to wrapped StorageServer instance
+        self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped
+                                # StorageServer
         self.clients = []
 
         for i in range(num_servers):
         self.clients = []
 
         for i in range(num_servers):
@@ -234,11 +249,16 @@ class NoNetworkGrid(service.MultiService):
         ss.setServiceParent(middleman)
         serverid = ss.my_nodeid
         self.servers_by_number[i] = ss
         ss.setServiceParent(middleman)
         serverid = ss.my_nodeid
         self.servers_by_number[i] = ss
-        self.servers_by_id[serverid] = wrap_storage_server(ss)
+        wrapper = wrap_storage_server(ss)
+        self.wrappers_by_id[serverid] = wrapper
+        self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper)
         self.rebuild_serverlist()
 
         self.rebuild_serverlist()
 
+    def get_all_serverids(self):
+        return self.proxies_by_id.keys()
+
     def rebuild_serverlist(self):
     def rebuild_serverlist(self):
-        self.all_servers = frozenset(self.servers_by_id.items())
+        self.all_servers = frozenset(self.proxies_by_id.values())
         for c in self.clients:
             c._servers = self.all_servers
 
         for c in self.clients:
             c._servers = self.all_servers
 
@@ -249,23 +269,24 @@ class NoNetworkGrid(service.MultiService):
             if ss.my_nodeid == serverid:
                 del self.servers_by_number[i]
                 break
             if ss.my_nodeid == serverid:
                 del self.servers_by_number[i]
                 break
-        del self.servers_by_id[serverid]
+        del self.wrappers_by_id[serverid]
+        del self.proxies_by_id[serverid]
         self.rebuild_serverlist()
 
     def break_server(self, serverid):
         # mark the given server as broken, so it will throw exceptions when
         # asked to hold a share or serve a share
         self.rebuild_serverlist()
 
     def break_server(self, serverid):
         # mark the given server as broken, so it will throw exceptions when
         # asked to hold a share or serve a share
-        self.servers_by_id[serverid].broken = True
+        self.wrappers_by_id[serverid].broken = True
 
     def hang_server(self, serverid):
         # hang the given server
 
     def hang_server(self, serverid):
         # hang the given server
-        ss = self.servers_by_id[serverid]
+        ss = self.wrappers_by_id[serverid]
         assert ss.hung_until is None
         ss.hung_until = defer.Deferred()
 
     def unhang_server(self, serverid):
         # unhang the given server
         assert ss.hung_until is None
         ss.hung_until = defer.Deferred()
 
     def unhang_server(self, serverid):
         # unhang the given server
-        ss = self.servers_by_id[serverid]
+        ss = self.wrappers_by_id[serverid]
         assert ss.hung_until is not None
         ss.hung_until.callback(None)
         ss.hung_until = None
         assert ss.hung_until is not None
         ss.hung_until.callback(None)
         ss.hung_until = None
index 7548d7cc8142f43d797498591d106e4e6e80c4db..d1cf86f7f31ae1c9174436831b364be9774586af 100644 (file)
@@ -3,7 +3,7 @@ import simplejson
 from twisted.trial import unittest
 from allmydata import check_results, uri
 from allmydata.web import check_results as web_check_results
 from twisted.trial import unittest
 from allmydata import check_results, uri
 from allmydata.web import check_results as web_check_results
-from allmydata.storage_client import StorageFarmBroker, NativeStorageClientDescriptor
+from allmydata.storage_client import StorageFarmBroker, NativeStorageServer
 from allmydata.monitor import Monitor
 from allmydata.test.no_network import GridTestMixin
 from allmydata.immutable.upload import Data
 from allmydata.monitor import Monitor
 from allmydata.test.no_network import GridTestMixin
 from allmydata.immutable.upload import Data
@@ -28,7 +28,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
                       "my-version": "ver",
                       "oldest-supported": "oldest",
                       }
                       "my-version": "ver",
                       "oldest-supported": "oldest",
                       }
-            dsc = NativeStorageClientDescriptor(peerid, ann_d)
+            dsc = NativeStorageServer(peerid, ann_d)
             sb.test_add_descriptor(peerid, dsc)
         c = FakeClient()
         c.storage_broker = sb
             sb.test_add_descriptor(peerid, dsc)
         c = FakeClient()
         c.storage_broker = sb
index 7c6a4567e6053a8ae950a54e96fc9a9fa5779772..abf63aa7678aae8911170764240e369e95b43356 100644 (file)
@@ -134,13 +134,12 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
         self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
 
     def _permute(self, sb, key):
         self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
 
     def _permute(self, sb, key):
-        return [ peerid
-                 for (peerid,rref) in sb.get_servers_for_index(key) ]
+        return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ]
 
     def test_permute(self):
         sb = StorageFarmBroker(None, True)
         for k in ["%d" % i for i in range(5)]:
 
     def test_permute(self):
         sb = StorageFarmBroker(None, True)
         for k in ["%d" % i for i in range(5)]:
-            sb.test_add_server(k, None)
+            sb.test_add_server(k, "rref")
 
         self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
         self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
 
         self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
         self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
index 5168b37e4a1396c569cea1a511ca852330e275e2..4bcfe28951dda6954fc77fce31903d63176b3b12 100644 (file)
@@ -287,14 +287,14 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
         if not incomplete:
             self.failUnlessEqual(sorted(d["servers-responding"]),
         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
         if not incomplete:
             self.failUnlessEqual(sorted(d["servers-responding"]),
-                                 sorted(self.g.servers_by_id.keys()),
+                                 sorted(self.g.get_all_serverids()),
                                  where)
             self.failUnless("sharemap" in d, str((where, d)))
             all_serverids = set()
             for (shareid, serverids) in d["sharemap"].items():
                 all_serverids.update(serverids)
             self.failUnlessEqual(sorted(all_serverids),
                                  where)
             self.failUnless("sharemap" in d, str((where, d)))
             all_serverids = set()
             for (shareid, serverids) in d["sharemap"].items():
                 all_serverids.update(serverids)
             self.failUnlessEqual(sorted(all_serverids),
-                                 sorted(self.g.servers_by_id.keys()),
+                                 sorted(self.g.get_all_serverids()),
                                  where)
 
         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
                                  where)
 
         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
@@ -545,7 +545,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
         if not incomplete:
             self.failUnlessEqual(sorted(r["servers-responding"]),
                                  sorted([idlib.nodeid_b2a(sid)
         if not incomplete:
             self.failUnlessEqual(sorted(r["servers-responding"]),
                                  sorted([idlib.nodeid_b2a(sid)
-                                         for sid in self.g.servers_by_id]),
+                                         for sid in self.g.get_all_serverids()]),
                                  where)
             self.failUnless("sharemap" in r, where)
             all_serverids = set()
                                  where)
             self.failUnless("sharemap" in r, where)
             all_serverids = set()
@@ -553,7 +553,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
                 all_serverids.update(serverids_s)
             self.failUnlessEqual(sorted(all_serverids),
                                  sorted([idlib.nodeid_b2a(sid)
                 all_serverids.update(serverids_s)
             self.failUnlessEqual(sorted(all_serverids),
                                  sorted([idlib.nodeid_b2a(sid)
-                                         for sid in self.g.servers_by_id]),
+                                         for sid in self.g.get_all_serverids()]),
                                  where)
         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
                                  where)
         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
index 3a692209972963c87dd69548f71c9d4e248dd63a..ab2e98bc0665345b43072e74011323cb6a2114e5 100644 (file)
@@ -596,7 +596,8 @@ class DownloadTest(_Base, unittest.TestCase):
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1167,7 +1168,8 @@ class DownloadV2(_Base, unittest.TestCase):
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
         # tweak the client's copies of server-version data, so it believes
         # that they're old and can't handle reads that overrun the length of
         # the share. This exercises a different code path.
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
@@ -1186,7 +1188,8 @@ class DownloadV2(_Base, unittest.TestCase):
         self.set_up_grid()
         self.c0 = self.g.clients[0]
 
         self.set_up_grid()
         self.c0 = self.g.clients[0]
 
-        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
+        for s in self.c0.storage_broker.get_connected_servers():
+            rref = s.get_rref()
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
             v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
             v1["tolerates-immutable-read-overrun"] = False
 
index e63cba329ceff3ddd0c1e0e7076e9e015139dd97..abed967a5f45b6a15a3faa2451c549810a3e77f3 100644 (file)
@@ -101,7 +101,8 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
 
         self.c0 = self.g.clients[0]
         nm = self.c0.nodemaker
 
         self.c0 = self.g.clients[0]
         nm = self.c0.nodemaker
-        self.servers = sorted([(id, ss) for (id, ss) in nm.storage_broker.get_all_servers()])
+        self.servers = sorted([(s.get_serverid(), s.get_rref())
+                               for s in nm.storage_broker.get_connected_servers()])
         self.servers = self.servers[5:] + self.servers[:5]
 
         if mutable:
         self.servers = self.servers[5:] + self.servers[:5]
 
         if mutable:
index edac3e699bdf949f6e03865056ec6dd9146ddde0..1ca4383849380c5bd9b9de05195d73636a50481d 100644 (file)
@@ -95,12 +95,23 @@ class TestShareFinder(unittest.TestCase):
                     self.s.hungry()
                 eventually(_give_buckets_and_hunger_again)
                 return d
                     self.s.hungry()
                 eventually(_give_buckets_and_hunger_again)
                 return d
+        class MockIServer(object):
+            def __init__(self, serverid, rref):
+                self.serverid = serverid
+                self.rref = rref
+            def get_serverid(self):
+                return self.serverid
+            def get_rref(self):
+                return self.rref
 
         mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
         mockserver2 = MockServer({})
         mockserver3 = MockServer({3: mock.Mock()})
         mockstoragebroker = mock.Mock()
 
         mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
         mockserver2 = MockServer({})
         mockserver3 = MockServer({3: mock.Mock()})
         mockstoragebroker = mock.Mock()
-        mockstoragebroker.get_servers_for_index.return_value = [ ('ms1', mockserver1), ('ms2', mockserver2), ('ms3', mockserver3), ]
+        servers = [ MockIServer("ms1", mockserver1),
+                    MockIServer("ms2", mockserver2),
+                    MockIServer("ms3", mockserver3), ]
+        mockstoragebroker.get_servers_for_psi.return_value = servers
         mockdownloadstatus = mock.Mock()
         mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
 
         mockdownloadstatus = mock.Mock()
         mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
 
index e4e6eb7faa192876092966c878f2546f60bf7890..0f49dced98f975e3d5679480e16da90ebeb06a43 100644 (file)
@@ -1910,11 +1910,9 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         d.addCallback(_got_key)
         def _break_peer0(res):
             si = self._storage_index
         d.addCallback(_got_key)
         def _break_peer0(res):
             si = self._storage_index
-            peerlist = nm.storage_broker.get_servers_for_index(si)
-            peerid0, connection0 = peerlist[0]
-            peerid1, connection1 = peerlist[1]
-            connection0.broken = True
-            self.connection1 = connection1
+            servers = nm.storage_broker.get_servers_for_psi(si)
+            self.g.break_server(servers[0].get_serverid())
+            self.server1 = servers[1]
         d.addCallback(_break_peer0)
         # now "create" the file, using the pre-established key, and let the
         # initial publish finally happen
         d.addCallback(_break_peer0)
         # now "create" the file, using the pre-established key, and let the
         # initial publish finally happen
@@ -1925,7 +1923,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
             # now break the second peer
             def _break_peer1(res):
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
             # now break the second peer
             def _break_peer1(res):
-                self.connection1.broken = True
+                self.g.break_server(self.server1.get_serverid())
             d.addCallback(_break_peer1)
             d.addCallback(lambda res: n.overwrite("contents 2"))
             # that ought to work too
             d.addCallback(_break_peer1)
             d.addCallback(lambda res: n.overwrite("contents 2"))
             # that ought to work too
@@ -1956,7 +1954,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         nm = self.g.clients[0].nodemaker
         sb = nm.storage_broker
 
         nm = self.g.clients[0].nodemaker
         sb = nm.storage_broker
 
-        peerids = [serverid for (serverid,ss) in sb.get_all_servers()]
+        peerids = [s.get_serverid() for s in sb.get_connected_servers()]
         self.g.break_server(peerids[0])
 
         d = nm.create_mutable_file("contents 1")
         self.g.break_server(peerids[0])
 
         d = nm.create_mutable_file("contents 1")
@@ -1980,8 +1978,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         self.basedir = "mutable/Problems/test_publish_all_servers_bad"
         self.set_up_grid()
         nm = self.g.clients[0].nodemaker
         self.basedir = "mutable/Problems/test_publish_all_servers_bad"
         self.set_up_grid()
         nm = self.g.clients[0].nodemaker
-        for (serverid,ss) in nm.storage_broker.get_all_servers():
-            ss.broken = True
+        for s in nm.storage_broker.get_connected_servers():
+            s.get_rref().broken = True
 
         d = self.shouldFail(NotEnoughServersError,
                             "test_publish_all_servers_bad",
 
         d = self.shouldFail(NotEnoughServersError,
                             "test_publish_all_servers_bad",
@@ -2033,8 +2031,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
             #  1. notice which server gets a read() call first
             #  2. tell that server to start throwing errors
             killer = FirstServerGetsKilled()
             #  1. notice which server gets a read() call first
             #  2. tell that server to start throwing errors
             killer = FirstServerGetsKilled()
-            for (serverid,ss) in nm.storage_broker.get_all_servers():
-                ss.post_call_notifier = killer.notify
+            for s in nm.storage_broker.get_connected_servers():
+                s.get_rref().post_call_notifier = killer.notify
         d.addCallback(_created)
 
         # now we update a servermap from a new node (which doesn't have the
         d.addCallback(_created)
 
         # now we update a servermap from a new node (which doesn't have the
@@ -2059,8 +2057,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
             self.uri = n.get_uri()
             self.n2 = nm.create_from_cap(self.uri)
             deleter = FirstServerGetsDeleted()
             self.uri = n.get_uri()
             self.n2 = nm.create_from_cap(self.uri)
             deleter = FirstServerGetsDeleted()
-            for (serverid,ss) in nm.storage_broker.get_all_servers():
-                ss.post_call_notifier = deleter.notify
+            for s in nm.storage_broker.get_connected_servers():
+                s.get_rref().post_call_notifier = deleter.notify
         d.addCallback(_created)
         d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
         return d
         d.addCallback(_created)
         d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
         return d
index bf6af0946054573b63b891febd5c5aeb2f5a22ad..e37f663833f32debc37a2815cab7be30b74d0d34 100644 (file)
@@ -65,7 +65,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
                 all_peerids = c.get_storage_broker().get_all_serverids()
                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
                 sb = c.storage_broker
                 all_peerids = c.get_storage_broker().get_all_serverids()
                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
                 sb = c.storage_broker
-                permuted_peers = sb.get_servers_for_index("a")
+                permuted_peers = sb.get_servers_for_psi("a")
                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
 
         d.addCallback(_check)
                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
 
         d.addCallback(_check)
@@ -101,7 +101,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
                 all_peerids = c.get_storage_broker().get_all_serverids()
                 self.failUnlessEqual(len(all_peerids), self.numclients)
                 sb = c.storage_broker
                 all_peerids = c.get_storage_broker().get_all_serverids()
                 self.failUnlessEqual(len(all_peerids), self.numclients)
                 sb = c.storage_broker
-                permuted_peers = sb.get_servers_for_index("a")
+                permuted_peers = sb.get_servers_for_psi("a")
                 self.failUnlessEqual(len(permuted_peers), self.numclients)
         d.addCallback(_check_connections)
 
                 self.failUnlessEqual(len(permuted_peers), self.numclients)
         d.addCallback(_check_connections)
 
index 022185e330f53f38db4d17aa52775165aa6d971f..11259324473228a2ea8ee33f905c2eaccc5c5226 100644 (file)
@@ -1730,12 +1730,8 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
         d.addCallback(lambda ign:
             self._add_server(server_number=2))
         def _break_server_2(ign):
         d.addCallback(lambda ign:
             self._add_server(server_number=2))
         def _break_server_2(ign):
-            server = self.g.servers_by_number[2].my_nodeid
-            # We have to break the server in servers_by_id,
-            # because the one in servers_by_number isn't wrapped,
-            # and doesn't look at its broken attribute when answering
-            # queries.
-            self.g.servers_by_id[server].broken = True
+            serverid = self.g.servers_by_number[2].my_nodeid
+            self.g.break_server(serverid)
         d.addCallback(_break_server_2)
         d.addCallback(lambda ign:
             self._add_server(server_number=3, readonly=True))
         d.addCallback(_break_server_2)
         d.addCallback(lambda ign:
             self._add_server(server_number=3, readonly=True))
index 96ce17bf4490dad7144633410a7aa5ac2cefe04b..44d050623073dba2b7d8631a457bf30812327478 100644 (file)
@@ -139,9 +139,9 @@ class ResultsBase:
 
         # this table is sorted by permuted order
         sb = c.get_storage_broker()
 
         # this table is sorted by permuted order
         sb = c.get_storage_broker()
-        permuted_peer_ids = [peerid
-                             for (peerid, rref)
-                             in sb.get_servers_for_index(cr.get_storage_index())]
+        permuted_peer_ids = [s.get_serverid()
+                             for s
+                             in sb.get_servers_for_psi(cr.get_storage_index())]
 
         num_shares_left = sum([len(shares) for shares in servers.values()])
         servermap = []
 
         num_shares_left = sum([len(shares) for shares in servers.values()])
         servermap = []
index 3af15d9273e83d83df292eb6e37748473f16d550..2f1f6d4fbf7d3a4ef63da12a312be168b8517a47 100644 (file)
@@ -247,18 +247,18 @@ class Root(rend.Page):
 
     def data_connected_storage_servers(self, ctx, data):
         sb = self.client.get_storage_broker()
 
     def data_connected_storage_servers(self, ctx, data):
         sb = self.client.get_storage_broker()
-        return len(sb.get_all_servers())
+        return len(sb.get_connected_servers())
 
     def data_services(self, ctx, data):
         sb = self.client.get_storage_broker()
 
     def data_services(self, ctx, data):
         sb = self.client.get_storage_broker()
-        return sb.get_all_descriptors()
+        return sb.get_known_servers()
 
 
-    def render_service_row(self, ctx, descriptor):
-        nodeid = descriptor.get_serverid()
+    def render_service_row(self, ctx, server):
+        nodeid = server.get_serverid()
 
         ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid))
 
         ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid))
-        ctx.fillSlots("nickname", descriptor.get_nickname())
-        rhost = descriptor.get_remote_host()
+        ctx.fillSlots("nickname", server.get_nickname())
+        rhost = server.get_remote_host()
         if rhost:
             if nodeid == self.client.nodeid:
                 rhost_s = "(loopback)"
         if rhost:
             if nodeid == self.client.nodeid:
                 rhost_s = "(loopback)"
@@ -267,12 +267,12 @@ class Root(rend.Page):
             else:
                 rhost_s = str(rhost)
             connected = "Yes: to " + rhost_s
             else:
                 rhost_s = str(rhost)
             connected = "Yes: to " + rhost_s
-            since = descriptor.get_last_connect_time()
+            since = server.get_last_connect_time()
         else:
             connected = "No"
         else:
             connected = "No"
-            since = descriptor.get_last_loss_time()
-        announced = descriptor.get_announcement_time()
-        announcement = descriptor.get_announcement()
+            since = server.get_last_loss_time()
+        announced = server.get_announcement_time()
+        announcement = server.get_announcement()
         version = announcement["my-version"]
         service_name = announcement["service-name"]
 
         version = announcement["my-version"]
         service_name = announcement["service-name"]