From: Brian Warner Date: Mon, 1 Jun 2009 21:06:04 +0000 (-0700) Subject: start to factor server-connection-management into a distinct 'StorageServerFarmBroker... X-Git-Tag: trac-3900~7 X-Git-Url: https://git.rkrishnan.org/specifications/components/com_hotproperty/COPYING.TGPPL.html?a=commitdiff_plain;h=c516361fd2648156194e34b6c9a02f24249312b6;p=tahoe-lafs%2Ftahoe-lafs.git start to factor server-connection-management into a distinct 'StorageServerFarmBroker' object, separate from the client and the introducer. This is the starting point for #467: static server selection --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 99dfa09b..106b1b72 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -11,6 +11,7 @@ from pycryptopp.publickey import rsa import allmydata from allmydata.storage.server import StorageServer +from allmydata import storage_client from allmydata.immutable.upload import Uploader from allmydata.immutable.download import Downloader from allmydata.immutable.filenode import FileNode, LiteralFileNode @@ -220,6 +221,8 @@ class Client(node.Node, pollmixin.PollMixin): convergence_s = self.get_or_create_private_config('convergence', _make_secret) self.convergence = base32.a2b(convergence_s) self._node_cache = weakref.WeakValueDictionary() # uri -> node + + self.init_client_storage_broker() self.add_service(History(self.stats_provider)) self.add_service(Uploader(helper_furl, self.stats_provider)) download_cachedir = os.path.join(self.basedir, @@ -229,6 +232,37 @@ class Client(node.Node, pollmixin.PollMixin): self.add_service(Downloader(self.stats_provider)) self.init_stub_client() + def init_client_storage_broker(self): + # create a StorageFarmBroker object, for use by Uploader/Downloader + # (and everybody else who wants to use storage servers) + self.storage_broker = sb = storage_client.StorageFarmBroker() + + # load static server specifications from tahoe.cfg, if any + #if self.config.has_section("client-server-selection"): + # server_params = {} # maps serverid to dict of parameters + # for (name, value) in self.config.items("client-server-selection"): + # pieces = name.split(".") + # if pieces[0] == "server": + # serverid = pieces[1] + # if serverid not in server_params: + # server_params[serverid] = {} + # server_params[serverid][pieces[2]] = value + # for serverid, params in server_params.items(): + # server_type = params.pop("type") + # if server_type == "tahoe-foolscap": + # s = storage_client.NativeStorageClient(*params) + # else: + # msg = ("unrecognized server type '%s' in " + # "tahoe.cfg [client-server-selection]server.%s.type" + # % (server_type, serverid)) + # raise storage_client.UnknownServerTypeError(msg) + # sb.add_server(s.serverid, s) + + # check to see if we're supposed to use the introducer too + if self.get_config("client-server-selection", "use_introducer", + default=True, boolean=True): + sb.use_introducer(self.introducer_client) + def init_stub_client(self): def _publish(res): # we publish an empty object so that the introducer can count how @@ -338,18 +372,10 @@ class Client(node.Node, pollmixin.PollMixin): self.log("hotline file missing, shutting down") reactor.stop() - def get_all_peerids(self): - return self.introducer_client.get_all_peerids() - def get_nickname_for_peerid(self, peerid): - return self.introducer_client.get_nickname_for_peerid(peerid) - - def get_permuted_peers(self, service_name, key): - """ - @return: list of (peerid, connection,) - """ - assert isinstance(service_name, str) - assert isinstance(key, str) - return self.introducer_client.get_permuted_peers(service_name, key) + def get_all_serverids(self): + return self.storage_broker.get_all_serverids() + def get_nickname_for_serverid(self, serverid): + return self.storage_broker.get_nickname_for_serverid(serverid) def get_encoding_parameters(self): return self.DEFAULT_ENCODING_PARAMETERS @@ -371,7 +397,7 @@ class Client(node.Node, pollmixin.PollMixin): temporary test network and need to know when it is safe to proceed with an upload or download.""" def _check(): - current_clients = list(self.get_all_peerids()) + current_clients = list(self.get_all_serverids()) return len(current_clients) >= num_clients d = self.poll(_check, 0.5) d.addCallback(lambda res: None) diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index 326bd572..38841024 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -743,8 +743,8 @@ class CiphertextDownloader(log.PrefixingLogMixin): def _get_all_shareholders(self): dl = [] - for (peerid,ss) in self._client.get_permuted_peers("storage", - self._storage_index): + sb = self._client.storage_broker + for (peerid,ss) in sb.get_servers(self._storage_index): d = ss.callRemote("get_buckets", self._storage_index) d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peerid,)) diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py index c8da456d..766f0bbd 100644 --- a/src/allmydata/immutable/offloaded.py +++ b/src/allmydata/immutable/offloaded.py @@ -54,7 +54,7 @@ class CHKCheckerAndUEBFetcher: def _get_all_shareholders(self, storage_index): dl = [] - for (peerid, ss) in self._peer_getter("storage", storage_index): + for (peerid, ss) in self._peer_getter(storage_index): d = ss.callRemote("get_buckets", storage_index) d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peerid,)) @@ -622,8 +622,8 @@ class Helper(Referenceable, service.MultiService): # see if this file is already in the grid lp2 = self.log("doing a quick check+UEBfetch", parent=lp, level=log.NOISY) - c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers, - storage_index, lp2) + sb = self.parent.storage_broker + c = CHKCheckerAndUEBFetcher(sb.get_servers, storage_index, lp2) d = c.check() def _checked(res): if res: diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index a8b270eb..26ff331f 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -166,7 +166,8 @@ class Tahoe2PeerSelector: self.use_peers = set() # PeerTrackers that have shares assigned to them self.preexisting_shares = {} # sharenum -> peerid holding the share - peers = client.get_permuted_peers("storage", storage_index) + sb = client.storage_broker + peers = list(sb.get_servers(storage_index)) if not peers: raise NoServersError("client gave us zero peers") diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index e446eeae..8ac19ef9 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -190,9 +190,8 @@ class Publish: assert self._privkey self._encprivkey = self._node.get_encprivkey() - client = self._node._client - full_peerlist = client.get_permuted_peers("storage", - self._storage_index) + sb = self._node._client.storage_broker + full_peerlist = sb.get_servers(self._storage_index) self.full_peerlist = full_peerlist # for use later, immutable self.bad_peers = set() # peerids who have errbacked/refused requests diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index 592e600e..565975b2 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -421,9 +421,8 @@ class ServermapUpdater: self._queries_completed = 0 - client = self._node._client - full_peerlist = client.get_permuted_peers("storage", - self._node._storage_index) + sb = self._node._client.storage_broker + full_peerlist = list(sb.get_servers(self._node._storage_index)) self.full_peerlist = full_peerlist # for use later, immutable self.extra_peers = full_peerlist[:] # peers are removed as we use them self._good_peers = set() # peers who had some shares diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py new file mode 100644 index 00000000..e8050379 --- /dev/null +++ b/src/allmydata/storage_client.py @@ -0,0 +1,78 @@ + +""" +I contain the client-side code which speaks to storage servers, in particular +the foolscap-based server implemented in src/allmydata/storage/*.py . +""" + +# roadmap: +# +# implement ServerFarm, change Client to create it, change +# uploader/servermap to get rrefs from it. ServerFarm calls +# IntroducerClient.subscribe_to . +# +# implement NativeStorageClient, change Tahoe2PeerSelector to use it. All +# NativeStorageClients come from the introducer +# +# change web/check_results.py to get NativeStorageClients from check results, +# ask it for a nickname (instead of using client.get_nickname_for_serverid) +# +# implement tahoe.cfg scanner, create static NativeStorageClients + +import sha + +class StorageFarmBroker: + """I live on the client, and know about storage servers. For each server + that is participating in a grid, I either maintain a connection to it or + remember enough information to establish a connection to it on demand. + I'm also responsible for subscribing to the IntroducerClient to find out + about new servers as they are announced by the Introducer. + """ + def __init__(self, permute_peers=True): + assert permute_peers # False not implemented yet + self.servers = {} # serverid -> StorageClient instance + self.permute_peers = permute_peers + self.introducer_client = None + def add_server(self, serverid, s): + self.servers[serverid] = s + def use_introducer(self, introducer_client): + self.introducer_client = ic = introducer_client + ic.subscribe_to("storage") + + def get_servers(self, peer_selection_index): + # first cut: return an iterator of (peerid, versioned-rref) tuples + assert self.permute_peers == True + servers = {} + for serverid,server in self.servers.items(): + servers[serverid] = server + if self.introducer_client: + ic = self.introducer_client + for serverid,server in ic.get_permuted_peers("storage", + peer_selection_index): + servers[serverid] = server + servers = servers.items() + key = peer_selection_index + return sorted(servers, key=lambda x: sha.new(key+x[0]).digest()) + + def get_all_serverids(self): + for serverid in self.servers: + yield serverid + if self.introducer_client: + for serverid,server in self.introducer_client.get_peers("storage"): + yield serverid + + def get_nickname_for_serverid(self, serverid): + if serverid in self.servers: + return self.servers[serverid].nickname + if self.introducer_client: + return self.introducer_client.get_nickname_for_peerid(serverid) + return None + +class NativeStorageClient: + def __init__(self, serverid, furl, nickname, min_shares=1): + self.serverid = serverid + self.furl = furl + self.nickname = nickname + self.min_shares = min_shares + +class UnknownServerTypeError(Exception): + pass diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index 5808b720..35431a13 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -104,6 +104,11 @@ def wrap(original, service_name): wrapper.version = version return wrapper +class NoNetworkStorageBroker: + def get_servers(self, key): + return sorted(self.client._servers, + key=lambda x: sha.new(key+x[0]).digest()) + class NoNetworkClient(Client): def create_tub(self): @@ -126,15 +131,16 @@ class NoNetworkClient(Client): pass def init_storage(self): pass + def init_client_storage_broker(self): + self.storage_broker = NoNetworkStorageBroker() + self.storage_broker.client = self def init_stub_client(self): pass def get_servers(self, service_name): return self._servers - def get_permuted_peers(self, service_name, key): - return sorted(self._servers, key=lambda x: sha.new(key+x[0]).digest()) - def get_nickname_for_peerid(self, peerid): + def get_nickname_for_serverid(self, serverid): return None class SimpleStats: diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index a63db06b..e9d88a33 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -3,32 +3,32 @@ import simplejson from twisted.trial import unittest from allmydata import check_results, uri from allmydata.web import check_results as web_check_results +from allmydata.storage_client import StorageFarmBroker, NativeStorageClient from common_web import WebRenderingMixin class FakeClient: - def get_nickname_for_peerid(self, peerid): - if peerid == "\x00"*20: - return "peer-0" - if peerid == "\xff"*20: - return "peer-f" - if peerid == "\x11"*20: - return "peer-11" - return "peer-unknown" - - def get_permuted_peers(self, service, key): - return [("\x00"*20, None), - ("\x11"*20, None), - ("\xff"*20, None), - ] + def get_nickname_for_serverid(self, serverid): + return self.storage_broker.get_nickname_for_serverid(serverid) class WebResultsRendering(unittest.TestCase, WebRenderingMixin): + def create_fake_client(self): + sb = StorageFarmBroker() + for (peerid, nickname) in [("\x00"*20, "peer-0"), + ("\xff"*20, "peer-f"), + ("\x11"*20, "peer-11")] : + n = NativeStorageClient(peerid, None, nickname) + sb.add_server(peerid, n) + c = FakeClient() + c.storage_broker = sb + return c + def render_json(self, page): d = self.render1(page, args={"output": ["json"]}) return d def test_literal(self): - c = FakeClient() + c = self.create_fake_client() lcr = web_check_results.LiteralCheckResults(c) d = self.render1(lcr) @@ -53,7 +53,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin): return d def test_check(self): - c = FakeClient() + c = self.create_fake_client() serverid_1 = "\x00"*20 serverid_f = "\xff"*20 u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234) @@ -151,7 +151,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin): def test_check_and_repair(self): - c = FakeClient() + c = self.create_fake_client() serverid_1 = "\x00"*20 serverid_f = "\xff"*20 u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234) diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index 5865edc5..06077a5c 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -6,6 +6,7 @@ from twisted.python import log import allmydata from allmydata import client +from allmydata.storage_client import StorageFarmBroker from allmydata.introducer.client import IntroducerClient from allmydata.util import base32 from foolscap.api import flushEventualQueue @@ -140,30 +141,19 @@ class Basic(unittest.TestCase): c = client.Client(basedir) self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0) - def _permute(self, c, key): + def _permute(self, sb, key): return [ peerid - for (peerid,rref) in c.get_permuted_peers("storage", key) ] + for (peerid,rref) in sb.get_servers(key) ] def test_permute(self): - basedir = "test_client.Basic.test_permute" - os.mkdir(basedir) - open(os.path.join(basedir, "introducer.furl"), "w").write("") - open(os.path.join(basedir, "vdrive.furl"), "w").write("") - c = client.Client(basedir) - c.introducer_client = FakeIntroducerClient() + sb = StorageFarmBroker() for k in ["%d" % i for i in range(5)]: - c.introducer_client.add_peer(k) - - self.failUnlessEqual(self._permute(c, "one"), ['3','1','0','4','2']) - self.failUnlessEqual(self._permute(c, "two"), ['0','4','2','1','3']) - c.introducer_client.remove_all_peers() - self.failUnlessEqual(self._permute(c, "one"), []) + sb.add_server(k, None) - c2 = client.Client(basedir) - c2.introducer_client = FakeIntroducerClient() - for k in ["%d" % i for i in range(5)]: - c2.introducer_client.add_peer(k) - self.failUnlessEqual(self._permute(c2, "one"), ['3','1','0','4','2']) + self.failUnlessEqual(self._permute(sb, "one"), ['3','1','0','4','2']) + self.failUnlessEqual(self._permute(sb, "two"), ['0','4','2','1','3']) + sb.servers = {} + self.failUnlessEqual(self._permute(sb, "one"), []) def test_versions(self): basedir = "test_client.Basic.test_versions" diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index ea448698..32b3ab86 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -6,6 +6,7 @@ from foolscap.api import Tub, fireEventually, flushEventualQueue from foolscap.logging import log from allmydata.storage.server import si_b2a +from allmydata.storage_client import StorageFarmBroker from allmydata.immutable import offloaded, upload from allmydata import uri from allmydata.util import hashutil, fileutil, mathutil @@ -62,12 +63,11 @@ class FakeClient(service.MultiService): "max_segment_size": 1*MiB, } stats_provider = None + storage_broker = StorageFarmBroker() def log(self, *args, **kwargs): return log.msg(*args, **kwargs) def get_encoding_parameters(self): return self.DEFAULT_ENCODING_PARAMETERS - def get_permuted_peers(self, service_name, storage_index): - return [] def flush_but_dont_ignore(res): d = flushEventualQueue() diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 4810ee94..1ac21f94 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -17,7 +17,7 @@ from allmydata.monitor import Monitor from allmydata.test.common import ShouldFailMixin from foolscap.api import eventually, fireEventually from foolscap.logging import log -import sha +from allmydata.storage_client import StorageFarmBroker from allmydata.mutable.filenode import MutableFileNode, BackoffAgent from allmydata.mutable.common import ResponseCache, \ @@ -171,12 +171,22 @@ class FakeClient: def __init__(self, num_peers=10): self._storage = FakeStorage() self._num_peers = num_peers - self._peerids = [tagged_hash("peerid", "%d" % i)[:20] - for i in range(self._num_peers)] - self._connections = dict([(peerid, FakeStorageServer(peerid, - self._storage)) - for peerid in self._peerids]) + peerids = [tagged_hash("peerid", "%d" % i)[:20] + for i in range(self._num_peers)] self.nodeid = "fakenodeid" + self.storage_broker = StorageFarmBroker() + for peerid in peerids: + fss = FakeStorageServer(peerid, self._storage) + self.storage_broker.add_server(peerid, fss) + + def get_all_serverids(self): + return self.storage_broker.get_all_serverids() + def debug_break_connection(self, peerid): + self.storage_broker.servers[peerid].broken = True + def debug_remove_connection(self, peerid): + self.storage_broker.servers.pop(peerid) + def debug_get_connection(self, peerid): + return self.storage_broker.servers[peerid] def get_encoding_parameters(self): return {"k": 3, "n": 10} @@ -204,19 +214,6 @@ class FakeClient: res = self.mutable_file_node_class(self).init_from_uri(u) return res - def get_permuted_peers(self, service_name, key): - """ - @return: list of (peerid, connection,) - """ - results = [] - for (peerid, connection) in self._connections.items(): - assert isinstance(peerid, str) - permuted = sha.new(key + peerid).digest() - results.append((permuted, peerid, connection)) - results.sort() - results = [ (r[1],r[2]) for r in results] - return results - def upload(self, uploadable): assert IUploadable.providedBy(uploadable) d = uploadable.get_size() @@ -276,7 +273,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin): def _created(n): self.failUnless(isinstance(n, FastMutableFileNode)) self.failUnlessEqual(n.get_storage_index(), n._storage_index) - peer0 = self.client._peerids[0] + peer0 = sorted(self.client.get_all_serverids())[0] shnums = self.client._storage._peers[peer0].keys() self.failUnlessEqual(len(shnums), 1) d.addCallback(_created) @@ -1575,7 +1572,7 @@ class MultipleEncodings(unittest.TestCase): sharemap = {} - for i,peerid in enumerate(self._client._peerids): + for i,peerid in enumerate(self._client.get_all_serverids()): peerid_s = shortnodeid_b2a(peerid) for shnum in self._shares1.get(peerid, {}): if shnum < len(places): @@ -1798,15 +1795,15 @@ class LessFakeClient(FakeClient): def __init__(self, basedir, num_peers=10): self._num_peers = num_peers - self._peerids = [tagged_hash("peerid", "%d" % i)[:20] - for i in range(self._num_peers)] - self._connections = {} - for peerid in self._peerids: + peerids = [tagged_hash("peerid", "%d" % i)[:20] + for i in range(self._num_peers)] + self.storage_broker = StorageFarmBroker() + for peerid in peerids: peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid)) make_dirs(peerdir) ss = StorageServer(peerdir, peerid) lw = LocalWrapper(ss) - self._connections[peerid] = lw + self.storage_broker.add_server(peerid, lw) self.nodeid = "fakenodeid" @@ -1886,7 +1883,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin): self.old_map = smap # now shut down one of the servers peer0 = list(smap.make_sharemap()[0])[0] - self.client._connections.pop(peer0) + self.client.debug_remove_connection(peer0) # then modify the file, leaving the old map untouched log.msg("starting winning write") return n.overwrite("contents 2") @@ -1920,7 +1917,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin): d.addCallback(n._generated) def _break_peer0(res): si = n.get_storage_index() - peerlist = self.client.get_permuted_peers("storage", si) + peerlist = list(self.client.storage_broker.get_servers(si)) peerid0, connection0 = peerlist[0] peerid1, connection1 = peerlist[1] connection0.broken = True @@ -1939,6 +1936,12 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin): # that ought to work too d.addCallback(lambda res: n.download_best_version()) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) + def _explain_error(f): + print f + if f.check(NotEnoughServersError): + print "first_error:", f.value.first_error + return f + d.addErrback(_explain_error) return d def test_bad_server_overlap(self): @@ -1954,8 +1957,8 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin): basedir = os.path.join("mutable/CollidingWrites/test_bad_server") self.client = LessFakeClient(basedir, 10) - peerids = sorted(self.client._connections.keys()) - self.client._connections[peerids[0]].broken = True + peerids = list(self.client.get_all_serverids()) + self.client.debug_break_connection(peerids[0]) d = self.client.create_mutable_file("contents 1") def _created(n): @@ -1963,7 +1966,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin): d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) # now break one of the remaining servers def _break_second_server(res): - self.client._connections[peerids[1]].broken = True + self.client.debug_break_connection(peerids[1]) d.addCallback(_break_second_server) d.addCallback(lambda res: n.overwrite("contents 2")) # that ought to work too @@ -1977,8 +1980,8 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin): # Break all servers: the publish should fail basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad") self.client = LessFakeClient(basedir, 20) - for connection in self.client._connections.values(): - connection.broken = True + for peerid in self.client.get_all_serverids(): + self.client.debug_break_connection(peerid) d = self.shouldFail(NotEnoughServersError, "test_publish_all_servers_bad", "Ran out of non-bad servers", diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 964af2e3..7ace3d1c 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -72,9 +72,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase): def _check(extra_node): self.extra_node = extra_node for c in self.clients: - all_peerids = list(c.get_all_peerids()) + all_peerids = list(c.get_all_serverids()) self.failUnlessEqual(len(all_peerids), self.numclients+1) - permuted_peers = list(c.get_permuted_peers("storage", "a")) + sb = c.storage_broker + permuted_peers = list(sb.get_servers("a")) self.failUnlessEqual(len(permuted_peers), self.numclients+1) d.addCallback(_check) @@ -109,9 +110,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase): d = self.set_up_nodes() def _check_connections(res): for c in self.clients: - all_peerids = list(c.get_all_peerids()) + all_peerids = list(c.get_all_serverids()) self.failUnlessEqual(len(all_peerids), self.numclients) - permuted_peers = list(c.get_permuted_peers("storage", "a")) + sb = c.storage_broker + permuted_peers = list(sb.get_servers("a")) self.failUnlessEqual(len(permuted_peers), self.numclients) d.addCallback(_check_connections) diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 845f412d..52d17796 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -15,6 +15,7 @@ from allmydata.util.assertutil import precondition from allmydata.util.deferredutil import DeferredListShouldSucceed from no_network import GridTestMixin from common_util import ShouldFailMixin +from allmydata.storage_client import StorageFarmBroker MiB = 1024*1024 @@ -158,22 +159,22 @@ class FakeClient: self.mode = mode self.num_servers = num_servers if mode == "some_big_some_small": - self.peers = [] + peers = [] for fakeid in range(num_servers): if fakeid % 2: - self.peers.append( ("%20d" % fakeid, - FakeStorageServer("good")) ) + peers.append(("%20d" % fakeid, FakeStorageServer("good"))) else: - self.peers.append( ("%20d" % fakeid, - FakeStorageServer("small")) ) + peers.append(("%20d" % fakeid, FakeStorageServer("small"))) else: - self.peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),) + peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),) for fakeid in range(self.num_servers) ] + self.storage_broker = StorageFarmBroker() + for (serverid, server) in peers: + self.storage_broker.add_server(serverid, server) + self.last_peers = [p[1] for p in peers] + def log(self, *args, **kwargs): pass - def get_permuted_peers(self, storage_index, include_myself): - self.last_peers = [p[1] for p in self.peers] - return self.peers def get_encoding_parameters(self): return self.DEFAULT_ENCODING_PARAMETERS diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 78c3e62d..1eb92bb3 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -9,6 +9,7 @@ from twisted.python import failure, log from nevow import rend from allmydata import interfaces, uri, webish from allmydata.storage.shares import get_share_file +from allmydata.storage_client import StorageFarmBroker from allmydata.immutable import upload, download from allmydata.web import status, common from allmydata.scripts.debug import CorruptShareOptions, corrupt_share @@ -64,11 +65,10 @@ class FakeClient(service.MultiService): def connected_to_introducer(self): return False - def get_nickname_for_peerid(self, peerid): + def get_nickname_for_serverid(self, serverid): return u"John Doe" - def get_permuted_peers(self, service_name, key): - return [] + storage_broker = StorageFarmBroker() def create_node_from_uri(self, auri): precondition(isinstance(auri, str), auri) diff --git a/src/allmydata/web/check_results.py b/src/allmydata/web/check_results.py index 03c689c2..3228f299 100644 --- a/src/allmydata/web/check_results.py +++ b/src/allmydata/web/check_results.py @@ -95,7 +95,7 @@ class ResultsBase: if data["list-corrupt-shares"]: badsharemap = [] for (serverid, si, shnum) in data["list-corrupt-shares"]: - nickname = c.get_nickname_for_peerid(serverid) + nickname = c.get_nickname_for_serverid(serverid) badsharemap.append(T.tr[T.td["sh#%d" % shnum], T.td[T.div(class_="nickname")[nickname], T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]], @@ -123,7 +123,7 @@ class ResultsBase: shareid_s = "" if i == 0: shareid_s = shareid - nickname = c.get_nickname_for_peerid(serverid) + nickname = c.get_nickname_for_serverid(serverid) sharemap.append(T.tr[T.td[shareid_s], T.td[T.div(class_="nickname")[nickname], T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]] @@ -137,15 +137,15 @@ class ResultsBase: add("Unrecoverable Versions", data["count-unrecoverable-versions"]) # this table is sorted by permuted order + sb = c.storage_broker permuted_peer_ids = [peerid for (peerid, rref) - in c.get_permuted_peers("storage", - cr.get_storage_index())] + in sb.get_servers(cr.get_storage_index())] num_shares_left = sum([len(shares) for shares in servers.values()]) servermap = [] for serverid in permuted_peer_ids: - nickname = c.get_nickname_for_peerid(serverid) + nickname = c.get_nickname_for_serverid(serverid) shareids = servers.get(serverid, []) shareids.reverse() shareids_s = [ T.tt[shareid, " "] for shareid in sorted(shareids) ] @@ -419,7 +419,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin): def render_server_problem(self, ctx, data): serverid = data data = [idlib.shortnodeid_b2a(serverid)] - nickname = self.client.get_nickname_for_peerid(serverid) + nickname = self.client.get_nickname_for_serverid(serverid) if nickname: data.append(" (%s)" % self._html(nickname)) return ctx.tag[data] @@ -433,7 +433,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin): return self.monitor.get_status().get_corrupt_shares() def render_share_problem(self, ctx, data): serverid, storage_index, sharenum = data - nickname = self.client.get_nickname_for_peerid(serverid) + nickname = self.client.get_nickname_for_serverid(serverid) ctx.fillSlots("serverid", idlib.shortnodeid_b2a(serverid)) if nickname: ctx.fillSlots("nickname", self._html(nickname))