]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
start to factor server-connection-management into a distinct 'StorageServerFarmBroker...
authorBrian Warner <warner@lothar.com>
Mon, 1 Jun 2009 21:06:04 +0000 (14:06 -0700)
committerBrian Warner <warner@lothar.com>
Mon, 1 Jun 2009 21:06:04 +0000 (14:06 -0700)
16 files changed:
src/allmydata/client.py
src/allmydata/immutable/download.py
src/allmydata/immutable/offloaded.py
src/allmydata/immutable/upload.py
src/allmydata/mutable/publish.py
src/allmydata/mutable/servermap.py
src/allmydata/storage_client.py [new file with mode: 0644]
src/allmydata/test/no_network.py
src/allmydata/test/test_checker.py
src/allmydata/test/test_client.py
src/allmydata/test/test_helper.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_system.py
src/allmydata/test/test_upload.py
src/allmydata/test/test_web.py
src/allmydata/web/check_results.py

index 99dfa09b1c712425db2a1ca64717d99e7e97fb93..106b1b724531e073abc591a4c075805cf2934424 100644 (file)
@@ -11,6 +11,7 @@ from pycryptopp.publickey import rsa
 
 import allmydata
 from allmydata.storage.server import StorageServer
+from allmydata import storage_client
 from allmydata.immutable.upload import Uploader
 from allmydata.immutable.download import Downloader
 from allmydata.immutable.filenode import FileNode, LiteralFileNode
@@ -220,6 +221,8 @@ class Client(node.Node, pollmixin.PollMixin):
         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
         self.convergence = base32.a2b(convergence_s)
         self._node_cache = weakref.WeakValueDictionary() # uri -> node
+
+        self.init_client_storage_broker()
         self.add_service(History(self.stats_provider))
         self.add_service(Uploader(helper_furl, self.stats_provider))
         download_cachedir = os.path.join(self.basedir,
@@ -229,6 +232,37 @@ class Client(node.Node, pollmixin.PollMixin):
         self.add_service(Downloader(self.stats_provider))
         self.init_stub_client()
 
+    def init_client_storage_broker(self):
+        # create a StorageFarmBroker object, for use by Uploader/Downloader
+        # (and everybody else who wants to use storage servers)
+        self.storage_broker = sb = storage_client.StorageFarmBroker()
+
+        # load static server specifications from tahoe.cfg, if any
+        #if self.config.has_section("client-server-selection"):
+        #    server_params = {} # maps serverid to dict of parameters
+        #    for (name, value) in self.config.items("client-server-selection"):
+        #        pieces = name.split(".")
+        #        if pieces[0] == "server":
+        #            serverid = pieces[1]
+        #            if serverid not in server_params:
+        #                server_params[serverid] = {}
+        #            server_params[serverid][pieces[2]] = value
+        #    for serverid, params in server_params.items():
+        #        server_type = params.pop("type")
+        #        if server_type == "tahoe-foolscap":
+        #            s = storage_client.NativeStorageClient(*params)
+        #        else:
+        #            msg = ("unrecognized server type '%s' in "
+        #                   "tahoe.cfg [client-server-selection]server.%s.type"
+        #                   % (server_type, serverid))
+        #            raise storage_client.UnknownServerTypeError(msg)
+        #        sb.add_server(s.serverid, s)
+
+        # check to see if we're supposed to use the introducer too
+        if self.get_config("client-server-selection", "use_introducer",
+                           default=True, boolean=True):
+            sb.use_introducer(self.introducer_client)
+
     def init_stub_client(self):
         def _publish(res):
             # we publish an empty object so that the introducer can count how
@@ -338,18 +372,10 @@ class Client(node.Node, pollmixin.PollMixin):
             self.log("hotline file missing, shutting down")
         reactor.stop()
 
-    def get_all_peerids(self):
-        return self.introducer_client.get_all_peerids()
-    def get_nickname_for_peerid(self, peerid):
-        return self.introducer_client.get_nickname_for_peerid(peerid)
-
-    def get_permuted_peers(self, service_name, key):
-        """
-        @return: list of (peerid, connection,)
-        """
-        assert isinstance(service_name, str)
-        assert isinstance(key, str)
-        return self.introducer_client.get_permuted_peers(service_name, key)
+    def get_all_serverids(self):
+        return self.storage_broker.get_all_serverids()
+    def get_nickname_for_serverid(self, serverid):
+        return self.storage_broker.get_nickname_for_serverid(serverid)
 
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
@@ -371,7 +397,7 @@ class Client(node.Node, pollmixin.PollMixin):
         temporary test network and need to know when it is safe to proceed
         with an upload or download."""
         def _check():
-            current_clients = list(self.get_all_peerids())
+            current_clients = list(self.get_all_serverids())
             return len(current_clients) >= num_clients
         d = self.poll(_check, 0.5)
         d.addCallback(lambda res: None)
index 326bd572ffa8fe4d315dbdff8c332d5866a03de6..3884102478e27b8c37b08dfb08d95d0c2aaff002 100644 (file)
@@ -743,8 +743,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
 
     def _get_all_shareholders(self):
         dl = []
-        for (peerid,ss) in self._client.get_permuted_peers("storage",
-                                                           self._storage_index):
+        sb = self._client.storage_broker
+        for (peerid,ss) in sb.get_servers(self._storage_index):
             d = ss.callRemote("get_buckets", self._storage_index)
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(peerid,))
index c8da456de6098b4b7c9d2a7134817a16cbb226e8..766f0bbdbec886d6cad8b34da1609a86c70d08ea 100644 (file)
@@ -54,7 +54,7 @@ class CHKCheckerAndUEBFetcher:
 
     def _get_all_shareholders(self, storage_index):
         dl = []
-        for (peerid, ss) in self._peer_getter("storage", storage_index):
+        for (peerid, ss) in self._peer_getter(storage_index):
             d = ss.callRemote("get_buckets", storage_index)
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(peerid,))
@@ -622,8 +622,8 @@ class Helper(Referenceable, service.MultiService):
         # see if this file is already in the grid
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
-        c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
-                                    storage_index, lp2)
+        sb = self.parent.storage_broker
+        c = CHKCheckerAndUEBFetcher(sb.get_servers, storage_index, lp2)
         d = c.check()
         def _checked(res):
             if res:
index a8b270ebe5779a6957fb7eebd827683400b3394c..26ff331fbbc91f4ab0b26e7a928dddacc8b66f21 100644 (file)
@@ -166,7 +166,8 @@ class Tahoe2PeerSelector:
         self.use_peers = set() # PeerTrackers that have shares assigned to them
         self.preexisting_shares = {} # sharenum -> peerid holding the share
 
-        peers = client.get_permuted_peers("storage", storage_index)
+        sb = client.storage_broker
+        peers = list(sb.get_servers(storage_index))
         if not peers:
             raise NoServersError("client gave us zero peers")
 
index e446eeaeea339a5715aa5e547816021b492ecf27..8ac19ef99c5c20172ca8c2ceafcb3d31e135de8b 100644 (file)
@@ -190,9 +190,8 @@ class Publish:
         assert self._privkey
         self._encprivkey = self._node.get_encprivkey()
 
-        client = self._node._client
-        full_peerlist = client.get_permuted_peers("storage",
-                                                  self._storage_index)
+        sb = self._node._client.storage_broker
+        full_peerlist = sb.get_servers(self._storage_index)
         self.full_peerlist = full_peerlist # for use later, immutable
         self.bad_peers = set() # peerids who have errbacked/refused requests
 
index 592e600edf5c43cf996ba44eec396a6640d8898f..565975b271b74b8fd006145f912b6d296703965a 100644 (file)
@@ -421,9 +421,8 @@ class ServermapUpdater:
 
         self._queries_completed = 0
 
-        client = self._node._client
-        full_peerlist = client.get_permuted_peers("storage",
-                                                  self._node._storage_index)
+        sb = self._node._client.storage_broker
+        full_peerlist = list(sb.get_servers(self._node._storage_index))
         self.full_peerlist = full_peerlist # for use later, immutable
         self.extra_peers = full_peerlist[:] # peers are removed as we use them
         self._good_peers = set() # peers who had some shares
diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py
new file mode 100644 (file)
index 0000000..e805037
--- /dev/null
@@ -0,0 +1,78 @@
+
+"""
+I contain the client-side code which speaks to storage servers, in particular
+the foolscap-based server implemented in src/allmydata/storage/*.py .
+"""
+
+# roadmap:
+#
+#  implement ServerFarm, change Client to create it, change
+#  uploader/servermap to get rrefs from it. ServerFarm calls
+#  IntroducerClient.subscribe_to .
+#
+#  implement NativeStorageClient, change Tahoe2PeerSelector to use it. All
+#  NativeStorageClients come from the introducer
+#
+#  change web/check_results.py to get NativeStorageClients from check results,
+#  ask it for a nickname (instead of using client.get_nickname_for_serverid)
+#
+#  implement tahoe.cfg scanner, create static NativeStorageClients
+
+import sha
+
+class StorageFarmBroker:
+    """I live on the client, and know about storage servers. For each server
+    that is participating in a grid, I either maintain a connection to it or
+    remember enough information to establish a connection to it on demand.
+    I'm also responsible for subscribing to the IntroducerClient to find out
+    about new servers as they are announced by the Introducer.
+    """
+    def __init__(self, permute_peers=True):
+        assert permute_peers # False not implemented yet
+        self.servers = {} # serverid -> StorageClient instance
+        self.permute_peers = permute_peers
+        self.introducer_client = None
+    def add_server(self, serverid, s):
+        self.servers[serverid] = s
+    def use_introducer(self, introducer_client):
+        self.introducer_client = ic = introducer_client
+        ic.subscribe_to("storage")
+
+    def get_servers(self, peer_selection_index):
+        # first cut: return an iterator of (peerid, versioned-rref) tuples
+        assert self.permute_peers == True
+        servers = {}
+        for serverid,server in self.servers.items():
+            servers[serverid] = server
+        if self.introducer_client:
+            ic = self.introducer_client
+            for serverid,server in ic.get_permuted_peers("storage",
+                                                         peer_selection_index):
+                servers[serverid] = server
+        servers = servers.items()
+        key = peer_selection_index
+        return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
+
+    def get_all_serverids(self):
+        for serverid in self.servers:
+            yield serverid
+        if self.introducer_client:
+            for serverid,server in self.introducer_client.get_peers("storage"):
+                yield serverid
+
+    def get_nickname_for_serverid(self, serverid):
+        if serverid in self.servers:
+            return self.servers[serverid].nickname
+        if self.introducer_client:
+            return self.introducer_client.get_nickname_for_peerid(serverid)
+        return None
+
+class NativeStorageClient:
+    def __init__(self, serverid, furl, nickname, min_shares=1):
+        self.serverid = serverid
+        self.furl = furl
+        self.nickname = nickname
+        self.min_shares = min_shares
+
+class UnknownServerTypeError(Exception):
+    pass
index 5808b7203f6ed2f8bd36104860b0afcf0016b440..35431a13be76ab5b3b19646876ab11d491670968 100644 (file)
@@ -104,6 +104,11 @@ def wrap(original, service_name):
     wrapper.version = version
     return wrapper
 
+class NoNetworkStorageBroker:
+    def get_servers(self, key):
+        return sorted(self.client._servers,
+                      key=lambda x: sha.new(key+x[0]).digest())
+
 class NoNetworkClient(Client):
 
     def create_tub(self):
@@ -126,15 +131,16 @@ class NoNetworkClient(Client):
         pass
     def init_storage(self):
         pass
+    def init_client_storage_broker(self):
+        self.storage_broker = NoNetworkStorageBroker()
+        self.storage_broker.client = self
     def init_stub_client(self):
         pass
 
     def get_servers(self, service_name):
         return self._servers
 
-    def get_permuted_peers(self, service_name, key):
-        return sorted(self._servers, key=lambda x: sha.new(key+x[0]).digest())
-    def get_nickname_for_peerid(self, peerid):
+    def get_nickname_for_serverid(self, serverid):
         return None
 
 class SimpleStats:
index a63db06bacdd44077739f967abc82b5c03d4a00e..e9d88a3303dd5683ee0185a8f279a87b3c8b0d2d 100644 (file)
@@ -3,32 +3,32 @@ import simplejson
 from twisted.trial import unittest
 from allmydata import check_results, uri
 from allmydata.web import check_results as web_check_results
+from allmydata.storage_client import StorageFarmBroker, NativeStorageClient
 from common_web import WebRenderingMixin
 
 class FakeClient:
-    def get_nickname_for_peerid(self, peerid):
-        if peerid == "\x00"*20:
-            return "peer-0"
-        if peerid == "\xff"*20:
-            return "peer-f"
-        if peerid == "\x11"*20:
-            return "peer-11"
-        return "peer-unknown"
-
-    def get_permuted_peers(self, service, key):
-        return [("\x00"*20, None),
-                ("\x11"*20, None),
-                ("\xff"*20, None),
-                ]
+    def get_nickname_for_serverid(self, serverid):
+        return self.storage_broker.get_nickname_for_serverid(serverid)
 
 class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
 
+    def create_fake_client(self):
+        sb = StorageFarmBroker()
+        for (peerid, nickname) in [("\x00"*20, "peer-0"),
+                                   ("\xff"*20, "peer-f"),
+                                   ("\x11"*20, "peer-11")] :
+            n = NativeStorageClient(peerid, None, nickname)
+            sb.add_server(peerid, n)
+        c = FakeClient()
+        c.storage_broker = sb
+        return c
+
     def render_json(self, page):
         d = self.render1(page, args={"output": ["json"]})
         return d
 
     def test_literal(self):
-        c = FakeClient()
+        c = self.create_fake_client()
         lcr = web_check_results.LiteralCheckResults(c)
 
         d = self.render1(lcr)
@@ -53,7 +53,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
         return d
 
     def test_check(self):
-        c = FakeClient()
+        c = self.create_fake_client()
         serverid_1 = "\x00"*20
         serverid_f = "\xff"*20
         u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
@@ -151,7 +151,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
 
 
     def test_check_and_repair(self):
-        c = FakeClient()
+        c = self.create_fake_client()
         serverid_1 = "\x00"*20
         serverid_f = "\xff"*20
         u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
index 5865edc5799f60d052fe4e8f958ec73ebd74acb1..06077a5c79f88c058a7824a9d16f8feec8362fce 100644 (file)
@@ -6,6 +6,7 @@ from twisted.python import log
 
 import allmydata
 from allmydata import client
+from allmydata.storage_client import StorageFarmBroker
 from allmydata.introducer.client import IntroducerClient
 from allmydata.util import base32
 from foolscap.api import flushEventualQueue
@@ -140,30 +141,19 @@ class Basic(unittest.TestCase):
         c = client.Client(basedir)
         self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
 
-    def _permute(self, c, key):
+    def _permute(self, sb, key):
         return [ peerid
-                 for (peerid,rref) in c.get_permuted_peers("storage", key) ]
+                 for (peerid,rref) in sb.get_servers(key) ]
 
     def test_permute(self):
-        basedir = "test_client.Basic.test_permute"
-        os.mkdir(basedir)
-        open(os.path.join(basedir, "introducer.furl"), "w").write("")
-        open(os.path.join(basedir, "vdrive.furl"), "w").write("")
-        c = client.Client(basedir)
-        c.introducer_client = FakeIntroducerClient()
+        sb = StorageFarmBroker()
         for k in ["%d" % i for i in range(5)]:
-            c.introducer_client.add_peer(k)
-
-        self.failUnlessEqual(self._permute(c, "one"), ['3','1','0','4','2'])
-        self.failUnlessEqual(self._permute(c, "two"), ['0','4','2','1','3'])
-        c.introducer_client.remove_all_peers()
-        self.failUnlessEqual(self._permute(c, "one"), [])
+            sb.add_server(k, None)
 
-        c2 = client.Client(basedir)
-        c2.introducer_client = FakeIntroducerClient()
-        for k in ["%d" % i for i in range(5)]:
-            c2.introducer_client.add_peer(k)
-        self.failUnlessEqual(self._permute(c2, "one"), ['3','1','0','4','2'])
+        self.failUnlessEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
+        self.failUnlessEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
+        sb.servers = {}
+        self.failUnlessEqual(self._permute(sb, "one"), [])
 
     def test_versions(self):
         basedir = "test_client.Basic.test_versions"
index ea4486980df23c745e2ea7aa3767dde6df1c2b46..32b3ab86df84050ccb09a8876dccfe1cc6e68c68 100644 (file)
@@ -6,6 +6,7 @@ from foolscap.api import Tub, fireEventually, flushEventualQueue
 from foolscap.logging import log
 
 from allmydata.storage.server import si_b2a
+from allmydata.storage_client import StorageFarmBroker
 from allmydata.immutable import offloaded, upload
 from allmydata import uri
 from allmydata.util import hashutil, fileutil, mathutil
@@ -62,12 +63,11 @@ class FakeClient(service.MultiService):
                                    "max_segment_size": 1*MiB,
                                    }
     stats_provider = None
+    storage_broker = StorageFarmBroker()
     def log(self, *args, **kwargs):
         return log.msg(*args, **kwargs)
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
-    def get_permuted_peers(self, service_name, storage_index):
-        return []
 
 def flush_but_dont_ignore(res):
     d = flushEventualQueue()
index 4810ee94ac728153405c4926db0dc94935cdc0b5..1ac21f94adfccfd51258913c8056b5d3fbe971bd 100644 (file)
@@ -17,7 +17,7 @@ from allmydata.monitor import Monitor
 from allmydata.test.common import ShouldFailMixin
 from foolscap.api import eventually, fireEventually
 from foolscap.logging import log
-import sha
+from allmydata.storage_client import StorageFarmBroker
 
 from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
 from allmydata.mutable.common import ResponseCache, \
@@ -171,12 +171,22 @@ class FakeClient:
     def __init__(self, num_peers=10):
         self._storage = FakeStorage()
         self._num_peers = num_peers
-        self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
-                         for i in range(self._num_peers)]
-        self._connections = dict([(peerid, FakeStorageServer(peerid,
-                                                             self._storage))
-                                  for peerid in self._peerids])
+        peerids = [tagged_hash("peerid", "%d" % i)[:20]
+                   for i in range(self._num_peers)]
         self.nodeid = "fakenodeid"
+        self.storage_broker = StorageFarmBroker()
+        for peerid in peerids:
+            fss = FakeStorageServer(peerid, self._storage)
+            self.storage_broker.add_server(peerid, fss)
+
+    def get_all_serverids(self):
+        return self.storage_broker.get_all_serverids()
+    def debug_break_connection(self, peerid):
+        self.storage_broker.servers[peerid].broken = True
+    def debug_remove_connection(self, peerid):
+        self.storage_broker.servers.pop(peerid)
+    def debug_get_connection(self, peerid):
+        return self.storage_broker.servers[peerid]
 
     def get_encoding_parameters(self):
         return {"k": 3, "n": 10}
@@ -204,19 +214,6 @@ class FakeClient:
         res = self.mutable_file_node_class(self).init_from_uri(u)
         return res
 
-    def get_permuted_peers(self, service_name, key):
-        """
-        @return: list of (peerid, connection,)
-        """
-        results = []
-        for (peerid, connection) in self._connections.items():
-            assert isinstance(peerid, str)
-            permuted = sha.new(key + peerid).digest()
-            results.append((permuted, peerid, connection))
-        results.sort()
-        results = [ (r[1],r[2]) for r in results]
-        return results
-
     def upload(self, uploadable):
         assert IUploadable.providedBy(uploadable)
         d = uploadable.get_size()
@@ -276,7 +273,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         def _created(n):
             self.failUnless(isinstance(n, FastMutableFileNode))
             self.failUnlessEqual(n.get_storage_index(), n._storage_index)
-            peer0 = self.client._peerids[0]
+            peer0 = sorted(self.client.get_all_serverids())[0]
             shnums = self.client._storage._peers[peer0].keys()
             self.failUnlessEqual(len(shnums), 1)
         d.addCallback(_created)
@@ -1575,7 +1572,7 @@ class MultipleEncodings(unittest.TestCase):
 
             sharemap = {}
 
-            for i,peerid in enumerate(self._client._peerids):
+            for i,peerid in enumerate(self._client.get_all_serverids()):
                 peerid_s = shortnodeid_b2a(peerid)
                 for shnum in self._shares1.get(peerid, {}):
                     if shnum < len(places):
@@ -1798,15 +1795,15 @@ class LessFakeClient(FakeClient):
 
     def __init__(self, basedir, num_peers=10):
         self._num_peers = num_peers
-        self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
-                         for i in range(self._num_peers)]
-        self._connections = {}
-        for peerid in self._peerids:
+        peerids = [tagged_hash("peerid", "%d" % i)[:20] 
+                   for i in range(self._num_peers)]
+        self.storage_broker = StorageFarmBroker()
+        for peerid in peerids:
             peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
             make_dirs(peerdir)
             ss = StorageServer(peerdir, peerid)
             lw = LocalWrapper(ss)
-            self._connections[peerid] = lw
+            self.storage_broker.add_server(peerid, lw)
         self.nodeid = "fakenodeid"
 
 
@@ -1886,7 +1883,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
                 self.old_map = smap
                 # now shut down one of the servers
                 peer0 = list(smap.make_sharemap()[0])[0]
-                self.client._connections.pop(peer0)
+                self.client.debug_remove_connection(peer0)
                 # then modify the file, leaving the old map untouched
                 log.msg("starting winning write")
                 return n.overwrite("contents 2")
@@ -1920,7 +1917,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         d.addCallback(n._generated)
         def _break_peer0(res):
             si = n.get_storage_index()
-            peerlist = self.client.get_permuted_peers("storage", si)
+            peerlist = list(self.client.storage_broker.get_servers(si))
             peerid0, connection0 = peerlist[0]
             peerid1, connection1 = peerlist[1]
             connection0.broken = True
@@ -1939,6 +1936,12 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         # that ought to work too
         d.addCallback(lambda res: n.download_best_version())
         d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+        def _explain_error(f):
+            print f
+            if f.check(NotEnoughServersError):
+                print "first_error:", f.value.first_error
+            return f
+        d.addErrback(_explain_error)
         return d
 
     def test_bad_server_overlap(self):
@@ -1954,8 +1957,8 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
         self.client = LessFakeClient(basedir, 10)
 
-        peerids = sorted(self.client._connections.keys())
-        self.client._connections[peerids[0]].broken = True
+        peerids = list(self.client.get_all_serverids())
+        self.client.debug_break_connection(peerids[0])
 
         d = self.client.create_mutable_file("contents 1")
         def _created(n):
@@ -1963,7 +1966,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
             # now break one of the remaining servers
             def _break_second_server(res):
-                self.client._connections[peerids[1]].broken = True
+                self.client.debug_break_connection(peerids[1])
             d.addCallback(_break_second_server)
             d.addCallback(lambda res: n.overwrite("contents 2"))
             # that ought to work too
@@ -1977,8 +1980,8 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         # Break all servers: the publish should fail
         basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")
         self.client = LessFakeClient(basedir, 20)
-        for connection in self.client._connections.values():
-            connection.broken = True
+        for peerid in self.client.get_all_serverids():
+            self.client.debug_break_connection(peerid)
         d = self.shouldFail(NotEnoughServersError,
                             "test_publish_all_servers_bad",
                             "Ran out of non-bad servers",
index 964af2e3c8e29de90ce58562c54c37dfedd51032..7ace3d1ca3521f04a64dca53fa836538b19a08ae 100644 (file)
@@ -72,9 +72,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
         def _check(extra_node):
             self.extra_node = extra_node
             for c in self.clients:
-                all_peerids = list(c.get_all_peerids())
+                all_peerids = list(c.get_all_serverids())
                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
-                permuted_peers = list(c.get_permuted_peers("storage", "a"))
+                sb = c.storage_broker
+                permuted_peers = list(sb.get_servers("a"))
                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
 
         d.addCallback(_check)
@@ -109,9 +110,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
         d = self.set_up_nodes()
         def _check_connections(res):
             for c in self.clients:
-                all_peerids = list(c.get_all_peerids())
+                all_peerids = list(c.get_all_serverids())
                 self.failUnlessEqual(len(all_peerids), self.numclients)
-                permuted_peers = list(c.get_permuted_peers("storage", "a"))
+                sb = c.storage_broker
+                permuted_peers = list(sb.get_servers("a"))
                 self.failUnlessEqual(len(permuted_peers), self.numclients)
         d.addCallback(_check_connections)
 
index 845f412d4716937498b5954baca3b6993a70485e..52d17796c6994df0aae74dd292aa28153ffc18c5 100644 (file)
@@ -15,6 +15,7 @@ from allmydata.util.assertutil import precondition
 from allmydata.util.deferredutil import DeferredListShouldSucceed
 from no_network import GridTestMixin
 from common_util import ShouldFailMixin
+from allmydata.storage_client import StorageFarmBroker
 
 MiB = 1024*1024
 
@@ -158,22 +159,22 @@ class FakeClient:
         self.mode = mode
         self.num_servers = num_servers
         if mode == "some_big_some_small":
-            self.peers = []
+            peers = []
             for fakeid in range(num_servers):
                 if fakeid % 2:
-                    self.peers.append( ("%20d" % fakeid,
-                                        FakeStorageServer("good")) )
+                    peers.append(("%20d" % fakeid, FakeStorageServer("good")))
                 else:
-                    self.peers.append( ("%20d" % fakeid,
-                                        FakeStorageServer("small")) )
+                    peers.append(("%20d" % fakeid, FakeStorageServer("small")))
         else:
-            self.peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
+            peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
                            for fakeid in range(self.num_servers) ]
+        self.storage_broker = StorageFarmBroker()
+        for (serverid, server) in peers:
+            self.storage_broker.add_server(serverid, server)
+        self.last_peers = [p[1] for p in peers]
+
     def log(self, *args, **kwargs):
         pass
-    def get_permuted_peers(self, storage_index, include_myself):
-        self.last_peers = [p[1] for p in self.peers]
-        return self.peers
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
 
index 78c3e62dedc7be841690e0e23cdc4629a65e95dc..1eb92bb3e57dbceb0549ad5a1fcd42175b428db3 100644 (file)
@@ -9,6 +9,7 @@ from twisted.python import failure, log
 from nevow import rend
 from allmydata import interfaces, uri, webish
 from allmydata.storage.shares import get_share_file
+from allmydata.storage_client import StorageFarmBroker
 from allmydata.immutable import upload, download
 from allmydata.web import status, common
 from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
@@ -64,11 +65,10 @@ class FakeClient(service.MultiService):
     def connected_to_introducer(self):
         return False
 
-    def get_nickname_for_peerid(self, peerid):
+    def get_nickname_for_serverid(self, serverid):
         return u"John Doe"
 
-    def get_permuted_peers(self, service_name, key):
-        return []
+    storage_broker = StorageFarmBroker()
 
     def create_node_from_uri(self, auri):
         precondition(isinstance(auri, str), auri)
index 03c689c226098d653084c8d7e076faac3ca2b20e..3228f29977331b0a35229c51d4381f5c9edca62b 100644 (file)
@@ -95,7 +95,7 @@ class ResultsBase:
         if data["list-corrupt-shares"]:
             badsharemap = []
             for (serverid, si, shnum) in data["list-corrupt-shares"]:
-                nickname = c.get_nickname_for_peerid(serverid)
+                nickname = c.get_nickname_for_serverid(serverid)
                 badsharemap.append(T.tr[T.td["sh#%d" % shnum],
                                         T.td[T.div(class_="nickname")[nickname],
                                               T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]],
@@ -123,7 +123,7 @@ class ResultsBase:
                 shareid_s = ""
                 if i == 0:
                     shareid_s = shareid
-                nickname = c.get_nickname_for_peerid(serverid)
+                nickname = c.get_nickname_for_serverid(serverid)
                 sharemap.append(T.tr[T.td[shareid_s],
                                      T.td[T.div(class_="nickname")[nickname],
                                           T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]]
@@ -137,15 +137,15 @@ class ResultsBase:
         add("Unrecoverable Versions", data["count-unrecoverable-versions"])
 
         # this table is sorted by permuted order
+        sb = c.storage_broker
         permuted_peer_ids = [peerid
                              for (peerid, rref)
-                             in c.get_permuted_peers("storage",
-                                                     cr.get_storage_index())]
+                             in sb.get_servers(cr.get_storage_index())]
 
         num_shares_left = sum([len(shares) for shares in servers.values()])
         servermap = []
         for serverid in permuted_peer_ids:
-            nickname = c.get_nickname_for_peerid(serverid)
+            nickname = c.get_nickname_for_serverid(serverid)
             shareids = servers.get(serverid, [])
             shareids.reverse()
             shareids_s = [ T.tt[shareid, " "] for shareid in sorted(shareids) ]
@@ -419,7 +419,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin):
     def render_server_problem(self, ctx, data):
         serverid = data
         data = [idlib.shortnodeid_b2a(serverid)]
-        nickname = self.client.get_nickname_for_peerid(serverid)
+        nickname = self.client.get_nickname_for_serverid(serverid)
         if nickname:
             data.append(" (%s)" % self._html(nickname))
         return ctx.tag[data]
@@ -433,7 +433,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin):
         return self.monitor.get_status().get_corrupt_shares()
     def render_share_problem(self, ctx, data):
         serverid, storage_index, sharenum = data
-        nickname = self.client.get_nickname_for_peerid(serverid)
+        nickname = self.client.get_nickname_for_serverid(serverid)
         ctx.fillSlots("serverid", idlib.shortnodeid_b2a(serverid))
         if nickname:
             ctx.fillSlots("nickname", self._html(nickname))