]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
start to factor server-connection-management into a distinct 'StorageServerFarmBroker...
authorBrian Warner <warner@lothar.com>
Mon, 1 Jun 2009 21:06:04 +0000 (14:06 -0700)
committerBrian Warner <warner@lothar.com>
Mon, 1 Jun 2009 21:06:04 +0000 (14:06 -0700)
16 files changed:
src/allmydata/client.py
src/allmydata/immutable/download.py
src/allmydata/immutable/offloaded.py
src/allmydata/immutable/upload.py
src/allmydata/mutable/publish.py
src/allmydata/mutable/servermap.py
src/allmydata/storage_client.py [new file with mode: 0644]
src/allmydata/test/no_network.py
src/allmydata/test/test_checker.py
src/allmydata/test/test_client.py
src/allmydata/test/test_helper.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_system.py
src/allmydata/test/test_upload.py
src/allmydata/test/test_web.py
src/allmydata/web/check_results.py

index 99dfa09b1c712425db2a1ca64717d99e7e97fb93..106b1b724531e073abc591a4c075805cf2934424 100644 (file)
@@ -11,6 +11,7 @@ from pycryptopp.publickey import rsa
 
 import allmydata
 from allmydata.storage.server import StorageServer
 
 import allmydata
 from allmydata.storage.server import StorageServer
+from allmydata import storage_client
 from allmydata.immutable.upload import Uploader
 from allmydata.immutable.download import Downloader
 from allmydata.immutable.filenode import FileNode, LiteralFileNode
 from allmydata.immutable.upload import Uploader
 from allmydata.immutable.download import Downloader
 from allmydata.immutable.filenode import FileNode, LiteralFileNode
@@ -220,6 +221,8 @@ class Client(node.Node, pollmixin.PollMixin):
         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
         self.convergence = base32.a2b(convergence_s)
         self._node_cache = weakref.WeakValueDictionary() # uri -> node
         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
         self.convergence = base32.a2b(convergence_s)
         self._node_cache = weakref.WeakValueDictionary() # uri -> node
+
+        self.init_client_storage_broker()
         self.add_service(History(self.stats_provider))
         self.add_service(Uploader(helper_furl, self.stats_provider))
         download_cachedir = os.path.join(self.basedir,
         self.add_service(History(self.stats_provider))
         self.add_service(Uploader(helper_furl, self.stats_provider))
         download_cachedir = os.path.join(self.basedir,
@@ -229,6 +232,37 @@ class Client(node.Node, pollmixin.PollMixin):
         self.add_service(Downloader(self.stats_provider))
         self.init_stub_client()
 
         self.add_service(Downloader(self.stats_provider))
         self.init_stub_client()
 
+    def init_client_storage_broker(self):
+        # create a StorageFarmBroker object, for use by Uploader/Downloader
+        # (and everybody else who wants to use storage servers)
+        self.storage_broker = sb = storage_client.StorageFarmBroker()
+
+        # load static server specifications from tahoe.cfg, if any
+        #if self.config.has_section("client-server-selection"):
+        #    server_params = {} # maps serverid to dict of parameters
+        #    for (name, value) in self.config.items("client-server-selection"):
+        #        pieces = name.split(".")
+        #        if pieces[0] == "server":
+        #            serverid = pieces[1]
+        #            if serverid not in server_params:
+        #                server_params[serverid] = {}
+        #            server_params[serverid][pieces[2]] = value
+        #    for serverid, params in server_params.items():
+        #        server_type = params.pop("type")
+        #        if server_type == "tahoe-foolscap":
+        #            s = storage_client.NativeStorageClient(*params)
+        #        else:
+        #            msg = ("unrecognized server type '%s' in "
+        #                   "tahoe.cfg [client-server-selection]server.%s.type"
+        #                   % (server_type, serverid))
+        #            raise storage_client.UnknownServerTypeError(msg)
+        #        sb.add_server(s.serverid, s)
+
+        # check to see if we're supposed to use the introducer too
+        if self.get_config("client-server-selection", "use_introducer",
+                           default=True, boolean=True):
+            sb.use_introducer(self.introducer_client)
+
     def init_stub_client(self):
         def _publish(res):
             # we publish an empty object so that the introducer can count how
     def init_stub_client(self):
         def _publish(res):
             # we publish an empty object so that the introducer can count how
@@ -338,18 +372,10 @@ class Client(node.Node, pollmixin.PollMixin):
             self.log("hotline file missing, shutting down")
         reactor.stop()
 
             self.log("hotline file missing, shutting down")
         reactor.stop()
 
-    def get_all_peerids(self):
-        return self.introducer_client.get_all_peerids()
-    def get_nickname_for_peerid(self, peerid):
-        return self.introducer_client.get_nickname_for_peerid(peerid)
-
-    def get_permuted_peers(self, service_name, key):
-        """
-        @return: list of (peerid, connection,)
-        """
-        assert isinstance(service_name, str)
-        assert isinstance(key, str)
-        return self.introducer_client.get_permuted_peers(service_name, key)
+    def get_all_serverids(self):
+        return self.storage_broker.get_all_serverids()
+    def get_nickname_for_serverid(self, serverid):
+        return self.storage_broker.get_nickname_for_serverid(serverid)
 
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
 
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
@@ -371,7 +397,7 @@ class Client(node.Node, pollmixin.PollMixin):
         temporary test network and need to know when it is safe to proceed
         with an upload or download."""
         def _check():
         temporary test network and need to know when it is safe to proceed
         with an upload or download."""
         def _check():
-            current_clients = list(self.get_all_peerids())
+            current_clients = list(self.get_all_serverids())
             return len(current_clients) >= num_clients
         d = self.poll(_check, 0.5)
         d.addCallback(lambda res: None)
             return len(current_clients) >= num_clients
         d = self.poll(_check, 0.5)
         d.addCallback(lambda res: None)
index 326bd572ffa8fe4d315dbdff8c332d5866a03de6..3884102478e27b8c37b08dfb08d95d0c2aaff002 100644 (file)
@@ -743,8 +743,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
 
     def _get_all_shareholders(self):
         dl = []
 
     def _get_all_shareholders(self):
         dl = []
-        for (peerid,ss) in self._client.get_permuted_peers("storage",
-                                                           self._storage_index):
+        sb = self._client.storage_broker
+        for (peerid,ss) in sb.get_servers(self._storage_index):
             d = ss.callRemote("get_buckets", self._storage_index)
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(peerid,))
             d = ss.callRemote("get_buckets", self._storage_index)
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(peerid,))
index c8da456de6098b4b7c9d2a7134817a16cbb226e8..766f0bbdbec886d6cad8b34da1609a86c70d08ea 100644 (file)
@@ -54,7 +54,7 @@ class CHKCheckerAndUEBFetcher:
 
     def _get_all_shareholders(self, storage_index):
         dl = []
 
     def _get_all_shareholders(self, storage_index):
         dl = []
-        for (peerid, ss) in self._peer_getter("storage", storage_index):
+        for (peerid, ss) in self._peer_getter(storage_index):
             d = ss.callRemote("get_buckets", storage_index)
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(peerid,))
             d = ss.callRemote("get_buckets", storage_index)
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(peerid,))
@@ -622,8 +622,8 @@ class Helper(Referenceable, service.MultiService):
         # see if this file is already in the grid
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
         # see if this file is already in the grid
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
-        c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
-                                    storage_index, lp2)
+        sb = self.parent.storage_broker
+        c = CHKCheckerAndUEBFetcher(sb.get_servers, storage_index, lp2)
         d = c.check()
         def _checked(res):
             if res:
         d = c.check()
         def _checked(res):
             if res:
index a8b270ebe5779a6957fb7eebd827683400b3394c..26ff331fbbc91f4ab0b26e7a928dddacc8b66f21 100644 (file)
@@ -166,7 +166,8 @@ class Tahoe2PeerSelector:
         self.use_peers = set() # PeerTrackers that have shares assigned to them
         self.preexisting_shares = {} # sharenum -> peerid holding the share
 
         self.use_peers = set() # PeerTrackers that have shares assigned to them
         self.preexisting_shares = {} # sharenum -> peerid holding the share
 
-        peers = client.get_permuted_peers("storage", storage_index)
+        sb = client.storage_broker
+        peers = list(sb.get_servers(storage_index))
         if not peers:
             raise NoServersError("client gave us zero peers")
 
         if not peers:
             raise NoServersError("client gave us zero peers")
 
index e446eeaeea339a5715aa5e547816021b492ecf27..8ac19ef99c5c20172ca8c2ceafcb3d31e135de8b 100644 (file)
@@ -190,9 +190,8 @@ class Publish:
         assert self._privkey
         self._encprivkey = self._node.get_encprivkey()
 
         assert self._privkey
         self._encprivkey = self._node.get_encprivkey()
 
-        client = self._node._client
-        full_peerlist = client.get_permuted_peers("storage",
-                                                  self._storage_index)
+        sb = self._node._client.storage_broker
+        full_peerlist = sb.get_servers(self._storage_index)
         self.full_peerlist = full_peerlist # for use later, immutable
         self.bad_peers = set() # peerids who have errbacked/refused requests
 
         self.full_peerlist = full_peerlist # for use later, immutable
         self.bad_peers = set() # peerids who have errbacked/refused requests
 
index 592e600edf5c43cf996ba44eec396a6640d8898f..565975b271b74b8fd006145f912b6d296703965a 100644 (file)
@@ -421,9 +421,8 @@ class ServermapUpdater:
 
         self._queries_completed = 0
 
 
         self._queries_completed = 0
 
-        client = self._node._client
-        full_peerlist = client.get_permuted_peers("storage",
-                                                  self._node._storage_index)
+        sb = self._node._client.storage_broker
+        full_peerlist = list(sb.get_servers(self._node._storage_index))
         self.full_peerlist = full_peerlist # for use later, immutable
         self.extra_peers = full_peerlist[:] # peers are removed as we use them
         self._good_peers = set() # peers who had some shares
         self.full_peerlist = full_peerlist # for use later, immutable
         self.extra_peers = full_peerlist[:] # peers are removed as we use them
         self._good_peers = set() # peers who had some shares
diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py
new file mode 100644 (file)
index 0000000..e805037
--- /dev/null
@@ -0,0 +1,78 @@
+
+"""
+I contain the client-side code which speaks to storage servers, in particular
+the foolscap-based server implemented in src/allmydata/storage/*.py .
+"""
+
+# roadmap:
+#
+#  implement ServerFarm, change Client to create it, change
+#  uploader/servermap to get rrefs from it. ServerFarm calls
+#  IntroducerClient.subscribe_to .
+#
+#  implement NativeStorageClient, change Tahoe2PeerSelector to use it. All
+#  NativeStorageClients come from the introducer
+#
+#  change web/check_results.py to get NativeStorageClients from check results,
+#  ask it for a nickname (instead of using client.get_nickname_for_serverid)
+#
+#  implement tahoe.cfg scanner, create static NativeStorageClients
+
+import sha
+
+class StorageFarmBroker:
+    """I live on the client, and know about storage servers. For each server
+    that is participating in a grid, I either maintain a connection to it or
+    remember enough information to establish a connection to it on demand.
+    I'm also responsible for subscribing to the IntroducerClient to find out
+    about new servers as they are announced by the Introducer.
+    """
+    def __init__(self, permute_peers=True):
+        assert permute_peers # False not implemented yet
+        self.servers = {} # serverid -> StorageClient instance
+        self.permute_peers = permute_peers
+        self.introducer_client = None
+    def add_server(self, serverid, s):
+        self.servers[serverid] = s
+    def use_introducer(self, introducer_client):
+        self.introducer_client = ic = introducer_client
+        ic.subscribe_to("storage")
+
+    def get_servers(self, peer_selection_index):
+        # first cut: return an iterator of (peerid, versioned-rref) tuples
+        assert self.permute_peers == True
+        servers = {}
+        for serverid,server in self.servers.items():
+            servers[serverid] = server
+        if self.introducer_client:
+            ic = self.introducer_client
+            for serverid,server in ic.get_permuted_peers("storage",
+                                                         peer_selection_index):
+                servers[serverid] = server
+        servers = servers.items()
+        key = peer_selection_index
+        return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
+
+    def get_all_serverids(self):
+        for serverid in self.servers:
+            yield serverid
+        if self.introducer_client:
+            for serverid,server in self.introducer_client.get_peers("storage"):
+                yield serverid
+
+    def get_nickname_for_serverid(self, serverid):
+        if serverid in self.servers:
+            return self.servers[serverid].nickname
+        if self.introducer_client:
+            return self.introducer_client.get_nickname_for_peerid(serverid)
+        return None
+
+class NativeStorageClient:
+    def __init__(self, serverid, furl, nickname, min_shares=1):
+        self.serverid = serverid
+        self.furl = furl
+        self.nickname = nickname
+        self.min_shares = min_shares
+
+class UnknownServerTypeError(Exception):
+    pass
index 5808b7203f6ed2f8bd36104860b0afcf0016b440..35431a13be76ab5b3b19646876ab11d491670968 100644 (file)
@@ -104,6 +104,11 @@ def wrap(original, service_name):
     wrapper.version = version
     return wrapper
 
     wrapper.version = version
     return wrapper
 
+class NoNetworkStorageBroker:
+    def get_servers(self, key):
+        return sorted(self.client._servers,
+                      key=lambda x: sha.new(key+x[0]).digest())
+
 class NoNetworkClient(Client):
 
     def create_tub(self):
 class NoNetworkClient(Client):
 
     def create_tub(self):
@@ -126,15 +131,16 @@ class NoNetworkClient(Client):
         pass
     def init_storage(self):
         pass
         pass
     def init_storage(self):
         pass
+    def init_client_storage_broker(self):
+        self.storage_broker = NoNetworkStorageBroker()
+        self.storage_broker.client = self
     def init_stub_client(self):
         pass
 
     def get_servers(self, service_name):
         return self._servers
 
     def init_stub_client(self):
         pass
 
     def get_servers(self, service_name):
         return self._servers
 
-    def get_permuted_peers(self, service_name, key):
-        return sorted(self._servers, key=lambda x: sha.new(key+x[0]).digest())
-    def get_nickname_for_peerid(self, peerid):
+    def get_nickname_for_serverid(self, serverid):
         return None
 
 class SimpleStats:
         return None
 
 class SimpleStats:
index a63db06bacdd44077739f967abc82b5c03d4a00e..e9d88a3303dd5683ee0185a8f279a87b3c8b0d2d 100644 (file)
@@ -3,32 +3,32 @@ import simplejson
 from twisted.trial import unittest
 from allmydata import check_results, uri
 from allmydata.web import check_results as web_check_results
 from twisted.trial import unittest
 from allmydata import check_results, uri
 from allmydata.web import check_results as web_check_results
+from allmydata.storage_client import StorageFarmBroker, NativeStorageClient
 from common_web import WebRenderingMixin
 
 class FakeClient:
 from common_web import WebRenderingMixin
 
 class FakeClient:
-    def get_nickname_for_peerid(self, peerid):
-        if peerid == "\x00"*20:
-            return "peer-0"
-        if peerid == "\xff"*20:
-            return "peer-f"
-        if peerid == "\x11"*20:
-            return "peer-11"
-        return "peer-unknown"
-
-    def get_permuted_peers(self, service, key):
-        return [("\x00"*20, None),
-                ("\x11"*20, None),
-                ("\xff"*20, None),
-                ]
+    def get_nickname_for_serverid(self, serverid):
+        return self.storage_broker.get_nickname_for_serverid(serverid)
 
 class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
 
 
 class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
 
+    def create_fake_client(self):
+        sb = StorageFarmBroker()
+        for (peerid, nickname) in [("\x00"*20, "peer-0"),
+                                   ("\xff"*20, "peer-f"),
+                                   ("\x11"*20, "peer-11")] :
+            n = NativeStorageClient(peerid, None, nickname)
+            sb.add_server(peerid, n)
+        c = FakeClient()
+        c.storage_broker = sb
+        return c
+
     def render_json(self, page):
         d = self.render1(page, args={"output": ["json"]})
         return d
 
     def test_literal(self):
     def render_json(self, page):
         d = self.render1(page, args={"output": ["json"]})
         return d
 
     def test_literal(self):
-        c = FakeClient()
+        c = self.create_fake_client()
         lcr = web_check_results.LiteralCheckResults(c)
 
         d = self.render1(lcr)
         lcr = web_check_results.LiteralCheckResults(c)
 
         d = self.render1(lcr)
@@ -53,7 +53,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
         return d
 
     def test_check(self):
         return d
 
     def test_check(self):
-        c = FakeClient()
+        c = self.create_fake_client()
         serverid_1 = "\x00"*20
         serverid_f = "\xff"*20
         u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
         serverid_1 = "\x00"*20
         serverid_f = "\xff"*20
         u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
@@ -151,7 +151,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
 
 
     def test_check_and_repair(self):
 
 
     def test_check_and_repair(self):
-        c = FakeClient()
+        c = self.create_fake_client()
         serverid_1 = "\x00"*20
         serverid_f = "\xff"*20
         u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
         serverid_1 = "\x00"*20
         serverid_f = "\xff"*20
         u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
index 5865edc5799f60d052fe4e8f958ec73ebd74acb1..06077a5c79f88c058a7824a9d16f8feec8362fce 100644 (file)
@@ -6,6 +6,7 @@ from twisted.python import log
 
 import allmydata
 from allmydata import client
 
 import allmydata
 from allmydata import client
+from allmydata.storage_client import StorageFarmBroker
 from allmydata.introducer.client import IntroducerClient
 from allmydata.util import base32
 from foolscap.api import flushEventualQueue
 from allmydata.introducer.client import IntroducerClient
 from allmydata.util import base32
 from foolscap.api import flushEventualQueue
@@ -140,30 +141,19 @@ class Basic(unittest.TestCase):
         c = client.Client(basedir)
         self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
 
         c = client.Client(basedir)
         self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
 
-    def _permute(self, c, key):
+    def _permute(self, sb, key):
         return [ peerid
         return [ peerid
-                 for (peerid,rref) in c.get_permuted_peers("storage", key) ]
+                 for (peerid,rref) in sb.get_servers(key) ]
 
     def test_permute(self):
 
     def test_permute(self):
-        basedir = "test_client.Basic.test_permute"
-        os.mkdir(basedir)
-        open(os.path.join(basedir, "introducer.furl"), "w").write("")
-        open(os.path.join(basedir, "vdrive.furl"), "w").write("")
-        c = client.Client(basedir)
-        c.introducer_client = FakeIntroducerClient()
+        sb = StorageFarmBroker()
         for k in ["%d" % i for i in range(5)]:
         for k in ["%d" % i for i in range(5)]:
-            c.introducer_client.add_peer(k)
-
-        self.failUnlessEqual(self._permute(c, "one"), ['3','1','0','4','2'])
-        self.failUnlessEqual(self._permute(c, "two"), ['0','4','2','1','3'])
-        c.introducer_client.remove_all_peers()
-        self.failUnlessEqual(self._permute(c, "one"), [])
+            sb.add_server(k, None)
 
 
-        c2 = client.Client(basedir)
-        c2.introducer_client = FakeIntroducerClient()
-        for k in ["%d" % i for i in range(5)]:
-            c2.introducer_client.add_peer(k)
-        self.failUnlessEqual(self._permute(c2, "one"), ['3','1','0','4','2'])
+        self.failUnlessEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
+        self.failUnlessEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
+        sb.servers = {}
+        self.failUnlessEqual(self._permute(sb, "one"), [])
 
     def test_versions(self):
         basedir = "test_client.Basic.test_versions"
 
     def test_versions(self):
         basedir = "test_client.Basic.test_versions"
index ea4486980df23c745e2ea7aa3767dde6df1c2b46..32b3ab86df84050ccb09a8876dccfe1cc6e68c68 100644 (file)
@@ -6,6 +6,7 @@ from foolscap.api import Tub, fireEventually, flushEventualQueue
 from foolscap.logging import log
 
 from allmydata.storage.server import si_b2a
 from foolscap.logging import log
 
 from allmydata.storage.server import si_b2a
+from allmydata.storage_client import StorageFarmBroker
 from allmydata.immutable import offloaded, upload
 from allmydata import uri
 from allmydata.util import hashutil, fileutil, mathutil
 from allmydata.immutable import offloaded, upload
 from allmydata import uri
 from allmydata.util import hashutil, fileutil, mathutil
@@ -62,12 +63,11 @@ class FakeClient(service.MultiService):
                                    "max_segment_size": 1*MiB,
                                    }
     stats_provider = None
                                    "max_segment_size": 1*MiB,
                                    }
     stats_provider = None
+    storage_broker = StorageFarmBroker()
     def log(self, *args, **kwargs):
         return log.msg(*args, **kwargs)
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
     def log(self, *args, **kwargs):
         return log.msg(*args, **kwargs)
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
-    def get_permuted_peers(self, service_name, storage_index):
-        return []
 
 def flush_but_dont_ignore(res):
     d = flushEventualQueue()
 
 def flush_but_dont_ignore(res):
     d = flushEventualQueue()
index 4810ee94ac728153405c4926db0dc94935cdc0b5..1ac21f94adfccfd51258913c8056b5d3fbe971bd 100644 (file)
@@ -17,7 +17,7 @@ from allmydata.monitor import Monitor
 from allmydata.test.common import ShouldFailMixin
 from foolscap.api import eventually, fireEventually
 from foolscap.logging import log
 from allmydata.test.common import ShouldFailMixin
 from foolscap.api import eventually, fireEventually
 from foolscap.logging import log
-import sha
+from allmydata.storage_client import StorageFarmBroker
 
 from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
 from allmydata.mutable.common import ResponseCache, \
 
 from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
 from allmydata.mutable.common import ResponseCache, \
@@ -171,12 +171,22 @@ class FakeClient:
     def __init__(self, num_peers=10):
         self._storage = FakeStorage()
         self._num_peers = num_peers
     def __init__(self, num_peers=10):
         self._storage = FakeStorage()
         self._num_peers = num_peers
-        self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
-                         for i in range(self._num_peers)]
-        self._connections = dict([(peerid, FakeStorageServer(peerid,
-                                                             self._storage))
-                                  for peerid in self._peerids])
+        peerids = [tagged_hash("peerid", "%d" % i)[:20]
+                   for i in range(self._num_peers)]
         self.nodeid = "fakenodeid"
         self.nodeid = "fakenodeid"
+        self.storage_broker = StorageFarmBroker()
+        for peerid in peerids:
+            fss = FakeStorageServer(peerid, self._storage)
+            self.storage_broker.add_server(peerid, fss)
+
+    def get_all_serverids(self):
+        return self.storage_broker.get_all_serverids()
+    def debug_break_connection(self, peerid):
+        self.storage_broker.servers[peerid].broken = True
+    def debug_remove_connection(self, peerid):
+        self.storage_broker.servers.pop(peerid)
+    def debug_get_connection(self, peerid):
+        return self.storage_broker.servers[peerid]
 
     def get_encoding_parameters(self):
         return {"k": 3, "n": 10}
 
     def get_encoding_parameters(self):
         return {"k": 3, "n": 10}
@@ -204,19 +214,6 @@ class FakeClient:
         res = self.mutable_file_node_class(self).init_from_uri(u)
         return res
 
         res = self.mutable_file_node_class(self).init_from_uri(u)
         return res
 
-    def get_permuted_peers(self, service_name, key):
-        """
-        @return: list of (peerid, connection,)
-        """
-        results = []
-        for (peerid, connection) in self._connections.items():
-            assert isinstance(peerid, str)
-            permuted = sha.new(key + peerid).digest()
-            results.append((permuted, peerid, connection))
-        results.sort()
-        results = [ (r[1],r[2]) for r in results]
-        return results
-
     def upload(self, uploadable):
         assert IUploadable.providedBy(uploadable)
         d = uploadable.get_size()
     def upload(self, uploadable):
         assert IUploadable.providedBy(uploadable)
         d = uploadable.get_size()
@@ -276,7 +273,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         def _created(n):
             self.failUnless(isinstance(n, FastMutableFileNode))
             self.failUnlessEqual(n.get_storage_index(), n._storage_index)
         def _created(n):
             self.failUnless(isinstance(n, FastMutableFileNode))
             self.failUnlessEqual(n.get_storage_index(), n._storage_index)
-            peer0 = self.client._peerids[0]
+            peer0 = sorted(self.client.get_all_serverids())[0]
             shnums = self.client._storage._peers[peer0].keys()
             self.failUnlessEqual(len(shnums), 1)
         d.addCallback(_created)
             shnums = self.client._storage._peers[peer0].keys()
             self.failUnlessEqual(len(shnums), 1)
         d.addCallback(_created)
@@ -1575,7 +1572,7 @@ class MultipleEncodings(unittest.TestCase):
 
             sharemap = {}
 
 
             sharemap = {}
 
-            for i,peerid in enumerate(self._client._peerids):
+            for i,peerid in enumerate(self._client.get_all_serverids()):
                 peerid_s = shortnodeid_b2a(peerid)
                 for shnum in self._shares1.get(peerid, {}):
                     if shnum < len(places):
                 peerid_s = shortnodeid_b2a(peerid)
                 for shnum in self._shares1.get(peerid, {}):
                     if shnum < len(places):
@@ -1798,15 +1795,15 @@ class LessFakeClient(FakeClient):
 
     def __init__(self, basedir, num_peers=10):
         self._num_peers = num_peers
 
     def __init__(self, basedir, num_peers=10):
         self._num_peers = num_peers
-        self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
-                         for i in range(self._num_peers)]
-        self._connections = {}
-        for peerid in self._peerids:
+        peerids = [tagged_hash("peerid", "%d" % i)[:20] 
+                   for i in range(self._num_peers)]
+        self.storage_broker = StorageFarmBroker()
+        for peerid in peerids:
             peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
             make_dirs(peerdir)
             ss = StorageServer(peerdir, peerid)
             lw = LocalWrapper(ss)
             peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
             make_dirs(peerdir)
             ss = StorageServer(peerdir, peerid)
             lw = LocalWrapper(ss)
-            self._connections[peerid] = lw
+            self.storage_broker.add_server(peerid, lw)
         self.nodeid = "fakenodeid"
 
 
         self.nodeid = "fakenodeid"
 
 
@@ -1886,7 +1883,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
                 self.old_map = smap
                 # now shut down one of the servers
                 peer0 = list(smap.make_sharemap()[0])[0]
                 self.old_map = smap
                 # now shut down one of the servers
                 peer0 = list(smap.make_sharemap()[0])[0]
-                self.client._connections.pop(peer0)
+                self.client.debug_remove_connection(peer0)
                 # then modify the file, leaving the old map untouched
                 log.msg("starting winning write")
                 return n.overwrite("contents 2")
                 # then modify the file, leaving the old map untouched
                 log.msg("starting winning write")
                 return n.overwrite("contents 2")
@@ -1920,7 +1917,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         d.addCallback(n._generated)
         def _break_peer0(res):
             si = n.get_storage_index()
         d.addCallback(n._generated)
         def _break_peer0(res):
             si = n.get_storage_index()
-            peerlist = self.client.get_permuted_peers("storage", si)
+            peerlist = list(self.client.storage_broker.get_servers(si))
             peerid0, connection0 = peerlist[0]
             peerid1, connection1 = peerlist[1]
             connection0.broken = True
             peerid0, connection0 = peerlist[0]
             peerid1, connection1 = peerlist[1]
             connection0.broken = True
@@ -1939,6 +1936,12 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         # that ought to work too
         d.addCallback(lambda res: n.download_best_version())
         d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
         # that ought to work too
         d.addCallback(lambda res: n.download_best_version())
         d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+        def _explain_error(f):
+            print f
+            if f.check(NotEnoughServersError):
+                print "first_error:", f.value.first_error
+            return f
+        d.addErrback(_explain_error)
         return d
 
     def test_bad_server_overlap(self):
         return d
 
     def test_bad_server_overlap(self):
@@ -1954,8 +1957,8 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
         self.client = LessFakeClient(basedir, 10)
 
         basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
         self.client = LessFakeClient(basedir, 10)
 
-        peerids = sorted(self.client._connections.keys())
-        self.client._connections[peerids[0]].broken = True
+        peerids = list(self.client.get_all_serverids())
+        self.client.debug_break_connection(peerids[0])
 
         d = self.client.create_mutable_file("contents 1")
         def _created(n):
 
         d = self.client.create_mutable_file("contents 1")
         def _created(n):
@@ -1963,7 +1966,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
             # now break one of the remaining servers
             def _break_second_server(res):
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
             # now break one of the remaining servers
             def _break_second_server(res):
-                self.client._connections[peerids[1]].broken = True
+                self.client.debug_break_connection(peerids[1])
             d.addCallback(_break_second_server)
             d.addCallback(lambda res: n.overwrite("contents 2"))
             # that ought to work too
             d.addCallback(_break_second_server)
             d.addCallback(lambda res: n.overwrite("contents 2"))
             # that ought to work too
@@ -1977,8 +1980,8 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         # Break all servers: the publish should fail
         basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")
         self.client = LessFakeClient(basedir, 20)
         # Break all servers: the publish should fail
         basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")
         self.client = LessFakeClient(basedir, 20)
-        for connection in self.client._connections.values():
-            connection.broken = True
+        for peerid in self.client.get_all_serverids():
+            self.client.debug_break_connection(peerid)
         d = self.shouldFail(NotEnoughServersError,
                             "test_publish_all_servers_bad",
                             "Ran out of non-bad servers",
         d = self.shouldFail(NotEnoughServersError,
                             "test_publish_all_servers_bad",
                             "Ran out of non-bad servers",
index 964af2e3c8e29de90ce58562c54c37dfedd51032..7ace3d1ca3521f04a64dca53fa836538b19a08ae 100644 (file)
@@ -72,9 +72,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
         def _check(extra_node):
             self.extra_node = extra_node
             for c in self.clients:
         def _check(extra_node):
             self.extra_node = extra_node
             for c in self.clients:
-                all_peerids = list(c.get_all_peerids())
+                all_peerids = list(c.get_all_serverids())
                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
-                permuted_peers = list(c.get_permuted_peers("storage", "a"))
+                sb = c.storage_broker
+                permuted_peers = list(sb.get_servers("a"))
                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
 
         d.addCallback(_check)
                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
 
         d.addCallback(_check)
@@ -109,9 +110,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
         d = self.set_up_nodes()
         def _check_connections(res):
             for c in self.clients:
         d = self.set_up_nodes()
         def _check_connections(res):
             for c in self.clients:
-                all_peerids = list(c.get_all_peerids())
+                all_peerids = list(c.get_all_serverids())
                 self.failUnlessEqual(len(all_peerids), self.numclients)
                 self.failUnlessEqual(len(all_peerids), self.numclients)
-                permuted_peers = list(c.get_permuted_peers("storage", "a"))
+                sb = c.storage_broker
+                permuted_peers = list(sb.get_servers("a"))
                 self.failUnlessEqual(len(permuted_peers), self.numclients)
         d.addCallback(_check_connections)
 
                 self.failUnlessEqual(len(permuted_peers), self.numclients)
         d.addCallback(_check_connections)
 
index 845f412d4716937498b5954baca3b6993a70485e..52d17796c6994df0aae74dd292aa28153ffc18c5 100644 (file)
@@ -15,6 +15,7 @@ from allmydata.util.assertutil import precondition
 from allmydata.util.deferredutil import DeferredListShouldSucceed
 from no_network import GridTestMixin
 from common_util import ShouldFailMixin
 from allmydata.util.deferredutil import DeferredListShouldSucceed
 from no_network import GridTestMixin
 from common_util import ShouldFailMixin
+from allmydata.storage_client import StorageFarmBroker
 
 MiB = 1024*1024
 
 
 MiB = 1024*1024
 
@@ -158,22 +159,22 @@ class FakeClient:
         self.mode = mode
         self.num_servers = num_servers
         if mode == "some_big_some_small":
         self.mode = mode
         self.num_servers = num_servers
         if mode == "some_big_some_small":
-            self.peers = []
+            peers = []
             for fakeid in range(num_servers):
                 if fakeid % 2:
             for fakeid in range(num_servers):
                 if fakeid % 2:
-                    self.peers.append( ("%20d" % fakeid,
-                                        FakeStorageServer("good")) )
+                    peers.append(("%20d" % fakeid, FakeStorageServer("good")))
                 else:
                 else:
-                    self.peers.append( ("%20d" % fakeid,
-                                        FakeStorageServer("small")) )
+                    peers.append(("%20d" % fakeid, FakeStorageServer("small")))
         else:
         else:
-            self.peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
+            peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
                            for fakeid in range(self.num_servers) ]
                            for fakeid in range(self.num_servers) ]
+        self.storage_broker = StorageFarmBroker()
+        for (serverid, server) in peers:
+            self.storage_broker.add_server(serverid, server)
+        self.last_peers = [p[1] for p in peers]
+
     def log(self, *args, **kwargs):
         pass
     def log(self, *args, **kwargs):
         pass
-    def get_permuted_peers(self, storage_index, include_myself):
-        self.last_peers = [p[1] for p in self.peers]
-        return self.peers
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
 
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
 
index 78c3e62dedc7be841690e0e23cdc4629a65e95dc..1eb92bb3e57dbceb0549ad5a1fcd42175b428db3 100644 (file)
@@ -9,6 +9,7 @@ from twisted.python import failure, log
 from nevow import rend
 from allmydata import interfaces, uri, webish
 from allmydata.storage.shares import get_share_file
 from nevow import rend
 from allmydata import interfaces, uri, webish
 from allmydata.storage.shares import get_share_file
+from allmydata.storage_client import StorageFarmBroker
 from allmydata.immutable import upload, download
 from allmydata.web import status, common
 from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
 from allmydata.immutable import upload, download
 from allmydata.web import status, common
 from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
@@ -64,11 +65,10 @@ class FakeClient(service.MultiService):
     def connected_to_introducer(self):
         return False
 
     def connected_to_introducer(self):
         return False
 
-    def get_nickname_for_peerid(self, peerid):
+    def get_nickname_for_serverid(self, serverid):
         return u"John Doe"
 
         return u"John Doe"
 
-    def get_permuted_peers(self, service_name, key):
-        return []
+    storage_broker = StorageFarmBroker()
 
     def create_node_from_uri(self, auri):
         precondition(isinstance(auri, str), auri)
 
     def create_node_from_uri(self, auri):
         precondition(isinstance(auri, str), auri)
index 03c689c226098d653084c8d7e076faac3ca2b20e..3228f29977331b0a35229c51d4381f5c9edca62b 100644 (file)
@@ -95,7 +95,7 @@ class ResultsBase:
         if data["list-corrupt-shares"]:
             badsharemap = []
             for (serverid, si, shnum) in data["list-corrupt-shares"]:
         if data["list-corrupt-shares"]:
             badsharemap = []
             for (serverid, si, shnum) in data["list-corrupt-shares"]:
-                nickname = c.get_nickname_for_peerid(serverid)
+                nickname = c.get_nickname_for_serverid(serverid)
                 badsharemap.append(T.tr[T.td["sh#%d" % shnum],
                                         T.td[T.div(class_="nickname")[nickname],
                                               T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]],
                 badsharemap.append(T.tr[T.td["sh#%d" % shnum],
                                         T.td[T.div(class_="nickname")[nickname],
                                               T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]],
@@ -123,7 +123,7 @@ class ResultsBase:
                 shareid_s = ""
                 if i == 0:
                     shareid_s = shareid
                 shareid_s = ""
                 if i == 0:
                     shareid_s = shareid
-                nickname = c.get_nickname_for_peerid(serverid)
+                nickname = c.get_nickname_for_serverid(serverid)
                 sharemap.append(T.tr[T.td[shareid_s],
                                      T.td[T.div(class_="nickname")[nickname],
                                           T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]]
                 sharemap.append(T.tr[T.td[shareid_s],
                                      T.td[T.div(class_="nickname")[nickname],
                                           T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]]
@@ -137,15 +137,15 @@ class ResultsBase:
         add("Unrecoverable Versions", data["count-unrecoverable-versions"])
 
         # this table is sorted by permuted order
         add("Unrecoverable Versions", data["count-unrecoverable-versions"])
 
         # this table is sorted by permuted order
+        sb = c.storage_broker
         permuted_peer_ids = [peerid
                              for (peerid, rref)
         permuted_peer_ids = [peerid
                              for (peerid, rref)
-                             in c.get_permuted_peers("storage",
-                                                     cr.get_storage_index())]
+                             in sb.get_servers(cr.get_storage_index())]
 
         num_shares_left = sum([len(shares) for shares in servers.values()])
         servermap = []
         for serverid in permuted_peer_ids:
 
         num_shares_left = sum([len(shares) for shares in servers.values()])
         servermap = []
         for serverid in permuted_peer_ids:
-            nickname = c.get_nickname_for_peerid(serverid)
+            nickname = c.get_nickname_for_serverid(serverid)
             shareids = servers.get(serverid, [])
             shareids.reverse()
             shareids_s = [ T.tt[shareid, " "] for shareid in sorted(shareids) ]
             shareids = servers.get(serverid, [])
             shareids.reverse()
             shareids_s = [ T.tt[shareid, " "] for shareid in sorted(shareids) ]
@@ -419,7 +419,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin):
     def render_server_problem(self, ctx, data):
         serverid = data
         data = [idlib.shortnodeid_b2a(serverid)]
     def render_server_problem(self, ctx, data):
         serverid = data
         data = [idlib.shortnodeid_b2a(serverid)]
-        nickname = self.client.get_nickname_for_peerid(serverid)
+        nickname = self.client.get_nickname_for_serverid(serverid)
         if nickname:
             data.append(" (%s)" % self._html(nickname))
         return ctx.tag[data]
         if nickname:
             data.append(" (%s)" % self._html(nickname))
         return ctx.tag[data]
@@ -433,7 +433,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin):
         return self.monitor.get_status().get_corrupt_shares()
     def render_share_problem(self, ctx, data):
         serverid, storage_index, sharenum = data
         return self.monitor.get_status().get_corrupt_shares()
     def render_share_problem(self, ctx, data):
         serverid, storage_index, sharenum = data
-        nickname = self.client.get_nickname_for_peerid(serverid)
+        nickname = self.client.get_nickname_for_serverid(serverid)
         ctx.fillSlots("serverid", idlib.shortnodeid_b2a(serverid))
         if nickname:
             ctx.fillSlots("nickname", self._html(nickname))
         ctx.fillSlots("serverid", idlib.shortnodeid_b2a(serverid))
         if nickname:
             ctx.fillSlots("nickname", self._html(nickname))