import allmydata
from allmydata.storage.server import StorageServer
+from allmydata import storage_client
from allmydata.immutable.upload import Uploader
from allmydata.immutable.download import Downloader
from allmydata.immutable.filenode import FileNode, LiteralFileNode
convergence_s = self.get_or_create_private_config('convergence', _make_secret)
self.convergence = base32.a2b(convergence_s)
self._node_cache = weakref.WeakValueDictionary() # uri -> node
+
+ self.init_client_storage_broker()
self.add_service(History(self.stats_provider))
self.add_service(Uploader(helper_furl, self.stats_provider))
download_cachedir = os.path.join(self.basedir,
self.add_service(Downloader(self.stats_provider))
self.init_stub_client()
+ def init_client_storage_broker(self):
+ # create a StorageFarmBroker object, for use by Uploader/Downloader
+ # (and everybody else who wants to use storage servers)
+ self.storage_broker = sb = storage_client.StorageFarmBroker()
+
+ # load static server specifications from tahoe.cfg, if any
+ #if self.config.has_section("client-server-selection"):
+ # server_params = {} # maps serverid to dict of parameters
+ # for (name, value) in self.config.items("client-server-selection"):
+ # pieces = name.split(".")
+ # if pieces[0] == "server":
+ # serverid = pieces[1]
+ # if serverid not in server_params:
+ # server_params[serverid] = {}
+ # server_params[serverid][pieces[2]] = value
+ # for serverid, params in server_params.items():
+ # server_type = params.pop("type")
+ # if server_type == "tahoe-foolscap":
+ # s = storage_client.NativeStorageClient(*params)
+ # else:
+ # msg = ("unrecognized server type '%s' in "
+ # "tahoe.cfg [client-server-selection]server.%s.type"
+ # % (server_type, serverid))
+ # raise storage_client.UnknownServerTypeError(msg)
+ # sb.add_server(s.serverid, s)
+
+ # check to see if we're supposed to use the introducer too
+ if self.get_config("client-server-selection", "use_introducer",
+ default=True, boolean=True):
+ sb.use_introducer(self.introducer_client)
+
def init_stub_client(self):
def _publish(res):
# we publish an empty object so that the introducer can count how
self.log("hotline file missing, shutting down")
reactor.stop()
- def get_all_peerids(self):
- return self.introducer_client.get_all_peerids()
- def get_nickname_for_peerid(self, peerid):
- return self.introducer_client.get_nickname_for_peerid(peerid)
-
- def get_permuted_peers(self, service_name, key):
- """
- @return: list of (peerid, connection,)
- """
- assert isinstance(service_name, str)
- assert isinstance(key, str)
- return self.introducer_client.get_permuted_peers(service_name, key)
+ def get_all_serverids(self):
+ return self.storage_broker.get_all_serverids()
+ def get_nickname_for_serverid(self, serverid):
+ return self.storage_broker.get_nickname_for_serverid(serverid)
def get_encoding_parameters(self):
return self.DEFAULT_ENCODING_PARAMETERS
temporary test network and need to know when it is safe to proceed
with an upload or download."""
def _check():
- current_clients = list(self.get_all_peerids())
+ current_clients = list(self.get_all_serverids())
return len(current_clients) >= num_clients
d = self.poll(_check, 0.5)
d.addCallback(lambda res: None)
def _get_all_shareholders(self):
dl = []
- for (peerid,ss) in self._client.get_permuted_peers("storage",
- self._storage_index):
+ sb = self._client.storage_broker
+ for (peerid,ss) in sb.get_servers(self._storage_index):
d = ss.callRemote("get_buckets", self._storage_index)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid,))
def _get_all_shareholders(self, storage_index):
dl = []
- for (peerid, ss) in self._peer_getter("storage", storage_index):
+ for (peerid, ss) in self._peer_getter(storage_index):
d = ss.callRemote("get_buckets", storage_index)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid,))
# see if this file is already in the grid
lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY)
- c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
- storage_index, lp2)
+ sb = self.parent.storage_broker
+ c = CHKCheckerAndUEBFetcher(sb.get_servers, storage_index, lp2)
d = c.check()
def _checked(res):
if res:
self.use_peers = set() # PeerTrackers that have shares assigned to them
self.preexisting_shares = {} # sharenum -> peerid holding the share
- peers = client.get_permuted_peers("storage", storage_index)
+ sb = client.storage_broker
+ peers = list(sb.get_servers(storage_index))
if not peers:
raise NoServersError("client gave us zero peers")
assert self._privkey
self._encprivkey = self._node.get_encprivkey()
- client = self._node._client
- full_peerlist = client.get_permuted_peers("storage",
- self._storage_index)
+ sb = self._node._client.storage_broker
+ full_peerlist = sb.get_servers(self._storage_index)
self.full_peerlist = full_peerlist # for use later, immutable
self.bad_peers = set() # peerids who have errbacked/refused requests
self._queries_completed = 0
- client = self._node._client
- full_peerlist = client.get_permuted_peers("storage",
- self._node._storage_index)
+ sb = self._node._client.storage_broker
+ full_peerlist = list(sb.get_servers(self._node._storage_index))
self.full_peerlist = full_peerlist # for use later, immutable
self.extra_peers = full_peerlist[:] # peers are removed as we use them
self._good_peers = set() # peers who had some shares
--- /dev/null
+
+"""
+I contain the client-side code which speaks to storage servers, in particular
+the foolscap-based server implemented in src/allmydata/storage/*.py .
+"""
+
+# roadmap:
+#
+# implement ServerFarm, change Client to create it, change
+# uploader/servermap to get rrefs from it. ServerFarm calls
+# IntroducerClient.subscribe_to .
+#
+# implement NativeStorageClient, change Tahoe2PeerSelector to use it. All
+# NativeStorageClients come from the introducer
+#
+# change web/check_results.py to get NativeStorageClients from check results,
+# ask it for a nickname (instead of using client.get_nickname_for_serverid)
+#
+# implement tahoe.cfg scanner, create static NativeStorageClients
+
+import sha
+
+class StorageFarmBroker:
+ """I live on the client, and know about storage servers. For each server
+ that is participating in a grid, I either maintain a connection to it or
+ remember enough information to establish a connection to it on demand.
+ I'm also responsible for subscribing to the IntroducerClient to find out
+ about new servers as they are announced by the Introducer.
+ """
+ def __init__(self, permute_peers=True):
+ assert permute_peers # False not implemented yet
+ self.servers = {} # serverid -> StorageClient instance
+ self.permute_peers = permute_peers
+ self.introducer_client = None
+ def add_server(self, serverid, s):
+ self.servers[serverid] = s
+ def use_introducer(self, introducer_client):
+ self.introducer_client = ic = introducer_client
+ ic.subscribe_to("storage")
+
+ def get_servers(self, peer_selection_index):
+ # first cut: return an iterator of (peerid, versioned-rref) tuples
+ assert self.permute_peers == True
+ servers = {}
+ for serverid,server in self.servers.items():
+ servers[serverid] = server
+ if self.introducer_client:
+ ic = self.introducer_client
+ for serverid,server in ic.get_permuted_peers("storage",
+ peer_selection_index):
+ servers[serverid] = server
+ servers = servers.items()
+ key = peer_selection_index
+ return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
+
+ def get_all_serverids(self):
+ for serverid in self.servers:
+ yield serverid
+ if self.introducer_client:
+ for serverid,server in self.introducer_client.get_peers("storage"):
+ yield serverid
+
+ def get_nickname_for_serverid(self, serverid):
+ if serverid in self.servers:
+ return self.servers[serverid].nickname
+ if self.introducer_client:
+ return self.introducer_client.get_nickname_for_peerid(serverid)
+ return None
+
+class NativeStorageClient:
+ def __init__(self, serverid, furl, nickname, min_shares=1):
+ self.serverid = serverid
+ self.furl = furl
+ self.nickname = nickname
+ self.min_shares = min_shares
+
+class UnknownServerTypeError(Exception):
+ pass
wrapper.version = version
return wrapper
+class NoNetworkStorageBroker:
+ def get_servers(self, key):
+ return sorted(self.client._servers,
+ key=lambda x: sha.new(key+x[0]).digest())
+
class NoNetworkClient(Client):
def create_tub(self):
pass
def init_storage(self):
pass
+ def init_client_storage_broker(self):
+ self.storage_broker = NoNetworkStorageBroker()
+ self.storage_broker.client = self
def init_stub_client(self):
pass
def get_servers(self, service_name):
return self._servers
- def get_permuted_peers(self, service_name, key):
- return sorted(self._servers, key=lambda x: sha.new(key+x[0]).digest())
- def get_nickname_for_peerid(self, peerid):
+ def get_nickname_for_serverid(self, serverid):
return None
class SimpleStats:
from twisted.trial import unittest
from allmydata import check_results, uri
from allmydata.web import check_results as web_check_results
+from allmydata.storage_client import StorageFarmBroker, NativeStorageClient
from common_web import WebRenderingMixin
class FakeClient:
- def get_nickname_for_peerid(self, peerid):
- if peerid == "\x00"*20:
- return "peer-0"
- if peerid == "\xff"*20:
- return "peer-f"
- if peerid == "\x11"*20:
- return "peer-11"
- return "peer-unknown"
-
- def get_permuted_peers(self, service, key):
- return [("\x00"*20, None),
- ("\x11"*20, None),
- ("\xff"*20, None),
- ]
+ def get_nickname_for_serverid(self, serverid):
+ return self.storage_broker.get_nickname_for_serverid(serverid)
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
+ def create_fake_client(self):
+ sb = StorageFarmBroker()
+ for (peerid, nickname) in [("\x00"*20, "peer-0"),
+ ("\xff"*20, "peer-f"),
+ ("\x11"*20, "peer-11")] :
+ n = NativeStorageClient(peerid, None, nickname)
+ sb.add_server(peerid, n)
+ c = FakeClient()
+ c.storage_broker = sb
+ return c
+
def render_json(self, page):
d = self.render1(page, args={"output": ["json"]})
return d
def test_literal(self):
- c = FakeClient()
+ c = self.create_fake_client()
lcr = web_check_results.LiteralCheckResults(c)
d = self.render1(lcr)
return d
def test_check(self):
- c = FakeClient()
+ c = self.create_fake_client()
serverid_1 = "\x00"*20
serverid_f = "\xff"*20
u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
def test_check_and_repair(self):
- c = FakeClient()
+ c = self.create_fake_client()
serverid_1 = "\x00"*20
serverid_f = "\xff"*20
u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
import allmydata
from allmydata import client
+from allmydata.storage_client import StorageFarmBroker
from allmydata.introducer.client import IntroducerClient
from allmydata.util import base32
from foolscap.api import flushEventualQueue
c = client.Client(basedir)
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
- def _permute(self, c, key):
+ def _permute(self, sb, key):
return [ peerid
- for (peerid,rref) in c.get_permuted_peers("storage", key) ]
+ for (peerid,rref) in sb.get_servers(key) ]
def test_permute(self):
- basedir = "test_client.Basic.test_permute"
- os.mkdir(basedir)
- open(os.path.join(basedir, "introducer.furl"), "w").write("")
- open(os.path.join(basedir, "vdrive.furl"), "w").write("")
- c = client.Client(basedir)
- c.introducer_client = FakeIntroducerClient()
+ sb = StorageFarmBroker()
for k in ["%d" % i for i in range(5)]:
- c.introducer_client.add_peer(k)
-
- self.failUnlessEqual(self._permute(c, "one"), ['3','1','0','4','2'])
- self.failUnlessEqual(self._permute(c, "two"), ['0','4','2','1','3'])
- c.introducer_client.remove_all_peers()
- self.failUnlessEqual(self._permute(c, "one"), [])
+ sb.add_server(k, None)
- c2 = client.Client(basedir)
- c2.introducer_client = FakeIntroducerClient()
- for k in ["%d" % i for i in range(5)]:
- c2.introducer_client.add_peer(k)
- self.failUnlessEqual(self._permute(c2, "one"), ['3','1','0','4','2'])
+ self.failUnlessEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
+ self.failUnlessEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
+ sb.servers = {}
+ self.failUnlessEqual(self._permute(sb, "one"), [])
def test_versions(self):
basedir = "test_client.Basic.test_versions"
from foolscap.logging import log
from allmydata.storage.server import si_b2a
+from allmydata.storage_client import StorageFarmBroker
from allmydata.immutable import offloaded, upload
from allmydata import uri
from allmydata.util import hashutil, fileutil, mathutil
"max_segment_size": 1*MiB,
}
stats_provider = None
+ storage_broker = StorageFarmBroker()
def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)
def get_encoding_parameters(self):
return self.DEFAULT_ENCODING_PARAMETERS
- def get_permuted_peers(self, service_name, storage_index):
- return []
def flush_but_dont_ignore(res):
d = flushEventualQueue()
from allmydata.test.common import ShouldFailMixin
from foolscap.api import eventually, fireEventually
from foolscap.logging import log
-import sha
+from allmydata.storage_client import StorageFarmBroker
from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
from allmydata.mutable.common import ResponseCache, \
def __init__(self, num_peers=10):
self._storage = FakeStorage()
self._num_peers = num_peers
- self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
- for i in range(self._num_peers)]
- self._connections = dict([(peerid, FakeStorageServer(peerid,
- self._storage))
- for peerid in self._peerids])
+ peerids = [tagged_hash("peerid", "%d" % i)[:20]
+ for i in range(self._num_peers)]
self.nodeid = "fakenodeid"
+ self.storage_broker = StorageFarmBroker()
+ for peerid in peerids:
+ fss = FakeStorageServer(peerid, self._storage)
+ self.storage_broker.add_server(peerid, fss)
+
+ def get_all_serverids(self):
+ return self.storage_broker.get_all_serverids()
+ def debug_break_connection(self, peerid):
+ self.storage_broker.servers[peerid].broken = True
+ def debug_remove_connection(self, peerid):
+ self.storage_broker.servers.pop(peerid)
+ def debug_get_connection(self, peerid):
+ return self.storage_broker.servers[peerid]
def get_encoding_parameters(self):
return {"k": 3, "n": 10}
res = self.mutable_file_node_class(self).init_from_uri(u)
return res
- def get_permuted_peers(self, service_name, key):
- """
- @return: list of (peerid, connection,)
- """
- results = []
- for (peerid, connection) in self._connections.items():
- assert isinstance(peerid, str)
- permuted = sha.new(key + peerid).digest()
- results.append((permuted, peerid, connection))
- results.sort()
- results = [ (r[1],r[2]) for r in results]
- return results
-
def upload(self, uploadable):
assert IUploadable.providedBy(uploadable)
d = uploadable.get_size()
def _created(n):
self.failUnless(isinstance(n, FastMutableFileNode))
self.failUnlessEqual(n.get_storage_index(), n._storage_index)
- peer0 = self.client._peerids[0]
+ peer0 = sorted(self.client.get_all_serverids())[0]
shnums = self.client._storage._peers[peer0].keys()
self.failUnlessEqual(len(shnums), 1)
d.addCallback(_created)
sharemap = {}
- for i,peerid in enumerate(self._client._peerids):
+ for i,peerid in enumerate(self._client.get_all_serverids()):
peerid_s = shortnodeid_b2a(peerid)
for shnum in self._shares1.get(peerid, {}):
if shnum < len(places):
def __init__(self, basedir, num_peers=10):
self._num_peers = num_peers
- self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
- for i in range(self._num_peers)]
- self._connections = {}
- for peerid in self._peerids:
+ peerids = [tagged_hash("peerid", "%d" % i)[:20]
+ for i in range(self._num_peers)]
+ self.storage_broker = StorageFarmBroker()
+ for peerid in peerids:
peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
make_dirs(peerdir)
ss = StorageServer(peerdir, peerid)
lw = LocalWrapper(ss)
- self._connections[peerid] = lw
+ self.storage_broker.add_server(peerid, lw)
self.nodeid = "fakenodeid"
self.old_map = smap
# now shut down one of the servers
peer0 = list(smap.make_sharemap()[0])[0]
- self.client._connections.pop(peer0)
+ self.client.debug_remove_connection(peer0)
# then modify the file, leaving the old map untouched
log.msg("starting winning write")
return n.overwrite("contents 2")
d.addCallback(n._generated)
def _break_peer0(res):
si = n.get_storage_index()
- peerlist = self.client.get_permuted_peers("storage", si)
+ peerlist = list(self.client.storage_broker.get_servers(si))
peerid0, connection0 = peerlist[0]
peerid1, connection1 = peerlist[1]
connection0.broken = True
# that ought to work too
d.addCallback(lambda res: n.download_best_version())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+ def _explain_error(f):
+ print f
+ if f.check(NotEnoughServersError):
+ print "first_error:", f.value.first_error
+ return f
+ d.addErrback(_explain_error)
return d
def test_bad_server_overlap(self):
basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
self.client = LessFakeClient(basedir, 10)
- peerids = sorted(self.client._connections.keys())
- self.client._connections[peerids[0]].broken = True
+ peerids = list(self.client.get_all_serverids())
+ self.client.debug_break_connection(peerids[0])
d = self.client.create_mutable_file("contents 1")
def _created(n):
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
# now break one of the remaining servers
def _break_second_server(res):
- self.client._connections[peerids[1]].broken = True
+ self.client.debug_break_connection(peerids[1])
d.addCallback(_break_second_server)
d.addCallback(lambda res: n.overwrite("contents 2"))
# that ought to work too
# Break all servers: the publish should fail
basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")
self.client = LessFakeClient(basedir, 20)
- for connection in self.client._connections.values():
- connection.broken = True
+ for peerid in self.client.get_all_serverids():
+ self.client.debug_break_connection(peerid)
d = self.shouldFail(NotEnoughServersError,
"test_publish_all_servers_bad",
"Ran out of non-bad servers",
def _check(extra_node):
self.extra_node = extra_node
for c in self.clients:
- all_peerids = list(c.get_all_peerids())
+ all_peerids = list(c.get_all_serverids())
self.failUnlessEqual(len(all_peerids), self.numclients+1)
- permuted_peers = list(c.get_permuted_peers("storage", "a"))
+ sb = c.storage_broker
+ permuted_peers = list(sb.get_servers("a"))
self.failUnlessEqual(len(permuted_peers), self.numclients+1)
d.addCallback(_check)
d = self.set_up_nodes()
def _check_connections(res):
for c in self.clients:
- all_peerids = list(c.get_all_peerids())
+ all_peerids = list(c.get_all_serverids())
self.failUnlessEqual(len(all_peerids), self.numclients)
- permuted_peers = list(c.get_permuted_peers("storage", "a"))
+ sb = c.storage_broker
+ permuted_peers = list(sb.get_servers("a"))
self.failUnlessEqual(len(permuted_peers), self.numclients)
d.addCallback(_check_connections)
from allmydata.util.deferredutil import DeferredListShouldSucceed
from no_network import GridTestMixin
from common_util import ShouldFailMixin
+from allmydata.storage_client import StorageFarmBroker
MiB = 1024*1024
self.mode = mode
self.num_servers = num_servers
if mode == "some_big_some_small":
- self.peers = []
+ peers = []
for fakeid in range(num_servers):
if fakeid % 2:
- self.peers.append( ("%20d" % fakeid,
- FakeStorageServer("good")) )
+ peers.append(("%20d" % fakeid, FakeStorageServer("good")))
else:
- self.peers.append( ("%20d" % fakeid,
- FakeStorageServer("small")) )
+ peers.append(("%20d" % fakeid, FakeStorageServer("small")))
else:
- self.peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
+ peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
for fakeid in range(self.num_servers) ]
+ self.storage_broker = StorageFarmBroker()
+ for (serverid, server) in peers:
+ self.storage_broker.add_server(serverid, server)
+ self.last_peers = [p[1] for p in peers]
+
def log(self, *args, **kwargs):
pass
- def get_permuted_peers(self, storage_index, include_myself):
- self.last_peers = [p[1] for p in self.peers]
- return self.peers
def get_encoding_parameters(self):
return self.DEFAULT_ENCODING_PARAMETERS
from nevow import rend
from allmydata import interfaces, uri, webish
from allmydata.storage.shares import get_share_file
+from allmydata.storage_client import StorageFarmBroker
from allmydata.immutable import upload, download
from allmydata.web import status, common
from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
def connected_to_introducer(self):
return False
- def get_nickname_for_peerid(self, peerid):
+ def get_nickname_for_serverid(self, serverid):
return u"John Doe"
- def get_permuted_peers(self, service_name, key):
- return []
+ storage_broker = StorageFarmBroker()
def create_node_from_uri(self, auri):
precondition(isinstance(auri, str), auri)
if data["list-corrupt-shares"]:
badsharemap = []
for (serverid, si, shnum) in data["list-corrupt-shares"]:
- nickname = c.get_nickname_for_peerid(serverid)
+ nickname = c.get_nickname_for_serverid(serverid)
badsharemap.append(T.tr[T.td["sh#%d" % shnum],
T.td[T.div(class_="nickname")[nickname],
T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]],
shareid_s = ""
if i == 0:
shareid_s = shareid
- nickname = c.get_nickname_for_peerid(serverid)
+ nickname = c.get_nickname_for_serverid(serverid)
sharemap.append(T.tr[T.td[shareid_s],
T.td[T.div(class_="nickname")[nickname],
T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]]
add("Unrecoverable Versions", data["count-unrecoverable-versions"])
# this table is sorted by permuted order
+ sb = c.storage_broker
permuted_peer_ids = [peerid
for (peerid, rref)
- in c.get_permuted_peers("storage",
- cr.get_storage_index())]
+ in sb.get_servers(cr.get_storage_index())]
num_shares_left = sum([len(shares) for shares in servers.values()])
servermap = []
for serverid in permuted_peer_ids:
- nickname = c.get_nickname_for_peerid(serverid)
+ nickname = c.get_nickname_for_serverid(serverid)
shareids = servers.get(serverid, [])
shareids.reverse()
shareids_s = [ T.tt[shareid, " "] for shareid in sorted(shareids) ]
def render_server_problem(self, ctx, data):
serverid = data
data = [idlib.shortnodeid_b2a(serverid)]
- nickname = self.client.get_nickname_for_peerid(serverid)
+ nickname = self.client.get_nickname_for_serverid(serverid)
if nickname:
data.append(" (%s)" % self._html(nickname))
return ctx.tag[data]
return self.monitor.get_status().get_corrupt_shares()
def render_share_problem(self, ctx, data):
serverid, storage_index, sharenum = data
- nickname = self.client.get_nickname_for_peerid(serverid)
+ nickname = self.client.get_nickname_for_serverid(serverid)
ctx.fillSlots("serverid", idlib.shortnodeid_b2a(serverid))
if nickname:
ctx.fillSlots("nickname", self._html(nickname))