def _lost_key_generator(self):
self._key_generator = None
- def get_servers(self, service_name):
- """ Return frozenset of (peerid, versioned-rref) """
- assert isinstance(service_name, str)
- return self.introducer_client.get_peers(service_name)
-
def init_web(self, webport):
self.log("init_web(webport=%s)", args=(webport,))
from allmydata.util.assertutil import _assert, precondition
from allmydata import codec, hashtree, uri
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
- IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError, \
+ IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
+ IStorageBroker, NotEnoughSharesError, \
UnableToFetchCriticalDownloadDataError
from allmydata.immutable import layout
from allmydata.monitor import Monitor
def __init__(self, storage_broker, v, target, monitor):
+ precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
precondition(IVerifierURI.providedBy(v), v)
precondition(IDownloadTarget.providedBy(target), target)
def _get_all_shareholders(self):
dl = []
sb = self._storage_broker
- for (peerid,ss) in sb.get_servers(self._storage_index):
+ for (peerid,ss) in sb.get_servers_for_index(self._storage_index):
self.log(format="sending DYHB to [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, umid="rT03hg")
def check_and_repair(self, monitor, verify=False, add_lease=False):
verifycap = self.get_verify_cap()
- servers = self._client.get_servers("storage")
+ sb = self._client.get_storage_broker()
+ servers = sb.get_all_servers()
c = Checker(client=self._client, verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, monitor=monitor)
return d
def check(self, monitor, verify=False, add_lease=False):
- v = Checker(client=self._client, verifycap=self.get_verify_cap(),
- servers=self._client.get_servers("storage"),
+ verifycap = self.get_verify_cap()
+ sb = self._client.get_storage_broker()
+ servers = sb.get_all_servers()
+
+ v = Checker(client=self._client, verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, monitor=monitor)
return v.start()
lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY)
sb = self.parent.get_storage_broker()
- c = CHKCheckerAndUEBFetcher(sb.get_servers, storage_index, lp2)
+ c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2)
d = c.check()
def _checked(res):
if res:
def start(self):
self.log("starting repair")
duc = DownUpConnector()
- dl = download.CiphertextDownloader(self._client, self._verifycap, target=duc, monitor=self._monitor)
+ sb = self._client.get_storage_broker()
+ dl = download.CiphertextDownloader(sb, self._verifycap, target=duc, monitor=self._monitor)
ul = upload.CHKUploader(self._client)
d = defer.Deferred()
self.preexisting_shares = {} # sharenum -> peerid holding the share
sb = client.get_storage_broker()
- peers = list(sb.get_servers(storage_index))
+ peers = sb.get_servers_for_index(storage_index)
if not peers:
raise NoServersError("client gave us zero peers")
@return: URIExtensionData
"""
+class IStorageBroker(Interface):
+ def get_servers_for_index(peer_selection_index):
+ """
+ @return: list of (peerid, versioned-rref) tuples
+ """
+ def get_all_servers():
+ """
+ @return: frozenset of (peerid, versioned-rref) tuples
+ """
+ def get_all_serverids():
+ """
+ @return: iterator of serverid strings
+ """
+ def get_nickname_for_serverid(serverid):
+ """
+ @return: unicode nickname, or None
+ """
# hm, we need a solution for forward references in schemas
self._encprivkey = self._node.get_encprivkey()
sb = self._node._client.get_storage_broker()
- full_peerlist = sb.get_servers(self._storage_index)
+ full_peerlist = sb.get_servers_for_index(self._storage_index)
self.full_peerlist = full_peerlist # for use later, immutable
self.bad_peers = set() # peerids who have errbacked/refused requests
self._queries_completed = 0
sb = self._node._client.get_storage_broker()
- full_peerlist = list(sb.get_servers(self._node._storage_index))
+ full_peerlist = sb.get_servers_for_index(self._node._storage_index)
self.full_peerlist = full_peerlist # for use later, immutable
self.extra_peers = full_peerlist[:] # peers are removed as we use them
self._good_peers = set() # peers who had some shares
# implement tahoe.cfg scanner, create static NativeStorageClients
import sha
+from zope.interface import implements
+from allmydata.interfaces import IStorageBroker
class StorageFarmBroker:
+ implements(IStorageBroker)
"""I live on the client, and know about storage servers. For each server
that is participating in a grid, I either maintain a connection to it or
remember enough information to establish a connection to it on demand.
self.introducer_client = ic = introducer_client
ic.subscribe_to("storage")
- def get_servers(self, peer_selection_index):
- # first cut: return an iterator of (peerid, versioned-rref) tuples
+ def get_servers_for_index(self, peer_selection_index):
+ # first cut: return a list of (peerid, versioned-rref) tuples
assert self.permute_peers == True
+ servers = self.get_all_servers()
+ key = peer_selection_index
+ return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
+
+ def get_all_servers(self):
+ # return a frozenset of (peerid, versioned-rref) tuples
servers = {}
for serverid,server in self.servers.items():
servers[serverid] = server
if self.introducer_client:
ic = self.introducer_client
- for serverid,server in ic.get_permuted_peers("storage",
- peer_selection_index):
+ for serverid,server in ic.get_peers("storage"):
servers[serverid] = server
- servers = servers.items()
- key = peer_selection_index
- return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
+ return frozenset(servers.items())
def get_all_serverids(self):
for serverid in self.servers:
import os.path
import sha
+from zope.interface import implements
from twisted.application import service
from twisted.internet import reactor
from twisted.python.failure import Failure
from allmydata.util import fileutil, idlib, hashutil
from allmydata.introducer.client import RemoteServiceConnector
from allmydata.test.common_web import HTTPClientGETFactory
+from allmydata.interfaces import IStorageBroker
class IntentionalError(Exception):
pass
return wrapper
class NoNetworkStorageBroker:
- def get_servers(self, key):
+ implements(IStorageBroker)
+ def get_servers_for_index(self, key):
return sorted(self.client._servers,
key=lambda x: sha.new(key+x[0]).digest())
+ def get_all_servers(self):
+ return frozenset(self.client._servers)
def get_nickname_for_serverid(self, serverid):
return None
self.storage_broker.client = self
def init_stub_client(self):
pass
-
- def get_servers(self, service_name):
- return self._servers
+ #._servers will be set by the NoNetworkGrid which creates us
class SimpleStats:
def __init__(self):
def _permute(self, sb, key):
return [ peerid
- for (peerid,rref) in sb.get_servers(key) ]
+ for (peerid,rref) in sb.get_servers_for_index(key) ]
def test_permute(self):
sb = StorageFarmBroker()
from allmydata.immutable import encode, upload, download
from allmydata.util import hashutil
from allmydata.util.assertutil import _assert
-from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
+from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
+ NotEnoughSharesError, IStorageBroker
from allmydata.monitor import Monitor
import common_util as testutil
def flip_bit(good): # flips the last bit
return good[:-1] + chr(ord(good[-1]) ^ 0x01)
-class FakeClient:
- def log(self, *args, **kwargs):
- pass
+class FakeStorageBroker:
+ implements(IStorageBroker)
class FakeBucketReaderWriterProxy:
implements(IStorageBucketWriter, IStorageBucketReader)
total_shares=verifycap.total_shares,
size=verifycap.size)
- client = FakeClient()
+ sb = FakeStorageBroker()
if not target:
target = download.Data()
target = download.DecryptingTarget(target, u.key)
- fd = download.CiphertextDownloader(client, u.get_verify_cap(), target, monitor=Monitor())
+ fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor())
# we manually cycle the CiphertextDownloader through a number of steps that
# would normally be sequenced by a Deferred chain in
d.addCallback(n._generated)
def _break_peer0(res):
si = n.get_storage_index()
- peerlist = list(self.client.storage_broker.get_servers(si))
+ peerlist = self.client.storage_broker.get_servers_for_index(si)
peerid0, connection0 = peerlist[0]
peerid1, connection1 = peerlist[1]
connection0.broken = True
all_peerids = list(c.get_storage_broker().get_all_serverids())
self.failUnlessEqual(len(all_peerids), self.numclients+1)
sb = c.storage_broker
- permuted_peers = list(sb.get_servers("a"))
+ permuted_peers = list(sb.get_servers_for_index("a"))
self.failUnlessEqual(len(permuted_peers), self.numclients+1)
d.addCallback(_check)
all_peerids = list(c.get_storage_broker().get_all_serverids())
self.failUnlessEqual(len(all_peerids), self.numclients)
sb = c.storage_broker
- permuted_peers = list(sb.get_servers("a"))
+ permuted_peers = list(sb.get_servers_for_index("a"))
self.failUnlessEqual(len(permuted_peers), self.numclients)
d.addCallback(_check_connections)
sb = c.get_storage_broker()
permuted_peer_ids = [peerid
for (peerid, rref)
- in sb.get_servers(cr.get_storage_index())]
+ in sb.get_servers_for_index(cr.get_storage_index())]
num_shares_left = sum([len(shares) for shares in servers.values()])
servermap = []