From: Brian Warner Date: Tue, 23 Jun 2009 02:10:47 +0000 (-0700) Subject: big rework of introducer client: change local API, split division of responsibilites... X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/class-simplejson.JSONEncoder.html?a=commitdiff_plain;h=8df15e9f30a3bda7055cc6ab829c8a19a0606c22;p=tahoe-lafs%2Ftahoe-lafs.git big rework of introducer client: change local API, split division of responsibilites better, remove old-code testing, improve error logging --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 037489ad..444a817e 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -6,7 +6,6 @@ from zope.interface import implements from twisted.internet import reactor from twisted.application.internet import TimerService from foolscap.api import Referenceable -from foolscap.logging import log from pycryptopp.publickey import rsa import allmydata @@ -18,7 +17,7 @@ from allmydata.immutable.filenode import FileNode, LiteralFileNode from allmydata.immutable.offloaded import Helper from allmydata.control import ControlServer from allmydata.introducer.client import IntroducerClient -from allmydata.util import hashutil, base32, pollmixin, cachedir +from allmydata.util import hashutil, base32, pollmixin, cachedir, log from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.util.time_format import parse_duration, parse_date from allmydata.uri import LiteralFileURI @@ -128,8 +127,6 @@ class Client(node.Node, pollmixin.PollMixin): d = self.when_tub_ready() def _start_introducer_client(res): ic.setServiceParent(self) - # nodes that want to upload and download will need storage servers - ic.subscribe_to("storage") d.addCallback(_start_introducer_client) d.addErrback(log.err, facility="tahoe.init", level=log.BAD, umid="URyI5w") @@ -235,9 +232,11 @@ class Client(node.Node, pollmixin.PollMixin): def init_client_storage_broker(self): # create a StorageFarmBroker object, for use by Uploader/Downloader # (and everybody else who wants to use storage servers) - self.storage_broker = sb = storage_client.StorageFarmBroker() + sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True) + self.storage_broker = sb - # load static server specifications from tahoe.cfg, if any + # load static server specifications from tahoe.cfg, if any. + # Not quite ready yet. #if self.config.has_section("client-server-selection"): # server_params = {} # maps serverid to dict of parameters # for (name, value) in self.config.items("client-server-selection"): @@ -390,8 +389,7 @@ class Client(node.Node, pollmixin.PollMixin): temporary test network and need to know when it is safe to proceed with an upload or download.""" def _check(): - current_clients = list(self.storage_broker.get_all_serverids()) - return len(current_clients) >= num_clients + return len(self.storage_broker.get_all_servers()) >= num_clients d = self.poll(_check, 0.5) d.addCallback(lambda res: None) return d diff --git a/src/allmydata/control.py b/src/allmydata/control.py index 01eb7c37..060608b4 100644 --- a/src/allmydata/control.py +++ b/src/allmydata/control.py @@ -70,10 +70,10 @@ class ControlServer(Referenceable, service.Service): # phase to take more than 10 seconds. Expect worst-case latency to be # 300ms. results = {} - conns = self.parent.introducer_client.get_all_connections_for("storage") - everyone = [(peerid,rref) for (peerid, service_name, rref) in conns] + sb = self.parent.get_storage_broker() + everyone = sb.get_all_servers() num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3))) - everyone = everyone * num_pings + everyone = list(everyone) * num_pings d = self._do_one_ping(None, everyone, results) return d def _do_one_ping(self, res, everyone_left, results): diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index acc03add..9dfc0bb8 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -8,9 +8,10 @@ from foolscap.api import DeadReferenceError, RemoteException, eventually from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib from allmydata.util.assertutil import _assert, precondition from allmydata import codec, hashtree, uri -from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \ +from allmydata.interfaces import IDownloadTarget, IDownloader, \ + IFileURI, IVerifierURI, \ IDownloadStatus, IDownloadResults, IValidatedThingProxy, \ - IStorageBroker, NotEnoughSharesError, \ + IStorageBroker, NotEnoughSharesError, NoServersError, \ UnableToFetchCriticalDownloadDataError from allmydata.immutable import layout from allmydata.monitor import Monitor @@ -747,7 +748,10 @@ class CiphertextDownloader(log.PrefixingLogMixin): def _get_all_shareholders(self): dl = [] sb = self._storage_broker - for (peerid,ss) in sb.get_servers_for_index(self._storage_index): + servers = sb.get_servers_for_index(self._storage_index) + if not servers: + raise NoServersError("broker gave us no servers!") + for (peerid,ss) in servers: self.log(format="sending DYHB to [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, umid="rT03hg") diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 24d67a50..e130fa0f 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -360,13 +360,56 @@ class IStorageBroker(Interface): """ def get_all_serverids(): """ - @return: iterator of serverid strings + @return: frozenset of serverid strings """ def get_nickname_for_serverid(serverid): """ @return: unicode nickname, or None """ + # methods moved from IntroducerClient, need review + def get_all_connections(): + """Return a frozenset of (nodeid, service_name, rref) tuples, one for + each active connection we've established to a remote service. This is + mostly useful for unit tests that need to wait until a certain number + of connections have been made.""" + + def get_all_connectors(): + """Return a dict that maps from (nodeid, service_name) to a + RemoteServiceConnector instance for all services that we are actively + trying to connect to. Each RemoteServiceConnector has the following + public attributes:: + + service_name: the type of service provided, like 'storage' + announcement_time: when we first heard about this service + last_connect_time: when we last established a connection + last_loss_time: when we last lost a connection + + version: the peer's version, from the most recent connection + oldest_supported: the peer's oldest supported version, same + + rref: the RemoteReference, if connected, otherwise None + remote_host: the IAddress, if connected, otherwise None + + This method is intended for monitoring interfaces, such as a web page + which describes connecting and connected peers. + """ + + def get_all_peerids(): + """Return a frozenset of all peerids to whom we have a connection (to + one or more services) established. Mostly useful for unit tests.""" + + def get_all_connections_for(service_name): + """Return a frozenset of (nodeid, service_name, rref) tuples, one + for each active connection that provides the given SERVICE_NAME.""" + + def get_permuted_peers(service_name, key): + """Returns an ordered list of (peerid, rref) tuples, selecting from + the connections that provide SERVICE_NAME, using a hash-based + permutation keyed by KEY. This randomizes the service list in a + repeatable way, to distribute load over many peers. + """ + # hm, we need a solution for forward references in schemas FileNode_ = Any() # TODO: foolscap needs constraints on copyables diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py index db09c7eb..31fbb5c2 100644 --- a/src/allmydata/introducer/client.py +++ b/src/allmydata/introducer/client.py @@ -1,108 +1,13 @@ -import re, time, sha from base64 import b32decode from zope.interface import implements from twisted.application import service -from foolscap.api import Referenceable +from foolscap.api import Referenceable, SturdyRef, eventually from allmydata.interfaces import InsufficientVersionError from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \ IIntroducerClient from allmydata.util import log, idlib -from allmydata.util.rrefutil import add_version_to_remote_reference -from allmydata.introducer.common import make_index - - -class RemoteServiceConnector: - """I hold information about a peer service that we want to connect to. If - we are connected, I hold the RemoteReference, the peer's address, and the - peer's version information. I remember information about when we were - last connected to the peer too, even if we aren't currently connected. - - @ivar announcement_time: when we first heard about this service - @ivar last_connect_time: when we last established a connection - @ivar last_loss_time: when we last lost a connection - - @ivar version: the peer's version, from the most recent announcement - @ivar oldest_supported: the peer's oldest supported version, same - @ivar nickname: the peer's self-reported nickname, same - - @ivar rref: the RemoteReference, if connected, otherwise None - @ivar remote_host: the IAddress, if connected, otherwise None - """ - - VERSION_DEFAULTS = { - "storage": { "http://allmydata.org/tahoe/protocols/storage/v1" : - { "maximum-immutable-share-size": 2**32, - "tolerates-immutable-read-overrun": False, - "delete-mutable-shares-with-zero-length-writev": False, - }, - "application-version": "unknown: no get_version()", - }, - "stub_client": { }, - } - - def __init__(self, announcement, tub, ic): - self._tub = tub - self._announcement = announcement - self._ic = ic - (furl, service_name, ri_name, nickname, ver, oldest) = announcement - - self._furl = furl - m = re.match(r'pb://(\w+)@', furl) - assert m - self._nodeid = b32decode(m.group(1).upper()) - self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid) - - self.service_name = service_name - - self.log("attempting to connect to %s" % self._nodeid_s) - self.announcement_time = time.time() - self.last_loss_time = None - self.rref = None - self.remote_host = None - self.last_connect_time = None - self.version = ver - self.oldest_supported = oldest - self.nickname = nickname - - def log(self, *args, **kwargs): - return self._ic.log(*args, **kwargs) - - def startConnecting(self): - self._reconnector = self._tub.connectTo(self._furl, self._got_service) - - def stopConnecting(self): - self._reconnector.stopConnecting() - - def _got_service(self, rref): - self.log("got connection to %s, getting versions" % self._nodeid_s) - - default = self.VERSION_DEFAULTS.get(self.service_name, {}) - d = add_version_to_remote_reference(rref, default) - d.addCallback(self._got_versioned_service) - - def _got_versioned_service(self, rref): - self.log("connected to %s, version %s" % (self._nodeid_s, rref.version)) - - self.last_connect_time = time.time() - self.remote_host = rref.tracker.broker.transport.getPeer() - - self.rref = rref - - self._ic.add_connection(self._nodeid, self.service_name, rref) - - rref.notifyOnDisconnect(self._lost, rref) - - def _lost(self, rref): - self.log("lost connection to %s" % self._nodeid_s) - self.last_loss_time = time.time() - self.rref = None - self.remote_host = None - self._ic.remove_connection(self._nodeid, self.service_name, rref) - - - def reset(self): - self._reconnector.reset() +from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref class IntroducerClient(service.Service, Referenceable): @@ -113,32 +18,40 @@ class IntroducerClient(service.Service, Referenceable): self._tub = tub self.introducer_furl = introducer_furl - self._nickname = nickname.encode("utf-8") + assert type(nickname) is unicode + self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8 self._my_version = my_version self._oldest_supported = oldest_supported self._published_announcements = set() self._publisher = None - self._connected = False + self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples self._subscribed_service_names = set() self._subscriptions = set() # requests we've actually sent - self._received_announcements = set() - # TODO: this set will grow without bound, until the node is restarted - - # we only accept one announcement per (peerid+service_name) pair. - # This insures that an upgraded host replace their previous - # announcement. It also means that each peer must have their own Tub - # (no sharing), which is slightly weird but consistent with the rest - # of the Tahoe codebase. - self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector - # self._connections is a set of (peerid, service_name, rref) tuples - self._connections = set() - - self.counter = 0 # incremented each time we change state, for tests + + # _current_announcements remembers one announcement per + # (servicename,serverid) pair. Anything that arrives with the same + # pair will displace the previous one. This stores unpacked + # announcement dictionaries, which can be compared for equality to + # distinguish re-announcement from updates. It also provides memory + # for clients who subscribe after startup. + self._current_announcements = {} + self.encoding_parameters = None + # hooks for unit tests + self._debug_counts = { + "inbound_message": 0, + "inbound_announcement": 0, + "wrong_service": 0, + "duplicate_announcement": 0, + "update": 0, + "new_announcement": 0, + "outbound_message": 0, + } + def startService(self): service.Service.startService(self) self._introducer_error = None @@ -170,7 +83,6 @@ class IntroducerClient(service.Service, Referenceable): needed = "http://allmydata.org/tahoe/protocols/introducer/v1" if needed not in publisher.version: raise InsufficientVersionError(needed, publisher.version) - self._connected = True self._publisher = publisher publisher.notifyOnDisconnect(self._disconnected) self._maybe_publish() @@ -178,16 +90,9 @@ class IntroducerClient(service.Service, Referenceable): def _disconnected(self): self.log("bummer, we've lost our connection to the introducer") - self._connected = False self._publisher = None self._subscriptions.clear() - def stopService(self): - service.Service.stopService(self) - self._introducer_reconnector.stopConnecting() - for rsc in self._connectors.itervalues(): - rsc.stopConnecting() - def log(self, *args, **kwargs): if "facility" not in kwargs: kwargs["facility"] = "tahoe.introducer" @@ -195,14 +100,19 @@ class IntroducerClient(service.Service, Referenceable): def publish(self, furl, service_name, remoteinterface_name): + assert type(self._nickname_utf8) is str # we always send UTF-8 ann = (furl, service_name, remoteinterface_name, - self._nickname, self._my_version, self._oldest_supported) + self._nickname_utf8, self._my_version, self._oldest_supported) self._published_announcements.add(ann) self._maybe_publish() - def subscribe_to(self, service_name): + def subscribe_to(self, service_name, cb, *args, **kwargs): + self._local_subscribers.append( (service_name,cb,args,kwargs) ) self._subscribed_service_names.add(service_name) self._maybe_subscribe() + for (servicename,nodeid),ann_d in self._current_announcements.items(): + if servicename == service_name: + eventually(cb, nodeid, ann_d) def _maybe_subscribe(self): if not self._publisher: @@ -215,7 +125,9 @@ class IntroducerClient(service.Service, Referenceable): # duplicate requests. self._subscriptions.add(service_name) d = self._publisher.callRemote("subscribe", self, service_name) - d.addErrback(log.err, facility="tahoe.introducer", + d.addErrback(trap_deadref) + d.addErrback(log.err, format="server errored during subscribe", + facility="tahoe.introducer", level=log.WEIRD, umid="2uMScQ") def _maybe_publish(self): @@ -224,100 +136,83 @@ class IntroducerClient(service.Service, Referenceable): return # this re-publishes everything. The Introducer ignores duplicates for ann in self._published_announcements: + self._debug_counts["outbound_message"] += 1 d = self._publisher.callRemote("publish", ann) - d.addErrback(log.err, facility="tahoe.introducer", + d.addErrback(trap_deadref) + d.addErrback(log.err, + format="server errored during publish %(ann)s", + ann=ann, facility="tahoe.introducer", level=log.WEIRD, umid="xs9pVQ") def remote_announce(self, announcements): + self.log("received %d announcements" % len(announcements)) + self._debug_counts["inbound_message"] += 1 for ann in announcements: - self.log("received %d announcements" % len(announcements)) - (furl, service_name, ri_name, nickname, ver, oldest) = ann - if service_name not in self._subscribed_service_names: - self.log("announcement for a service we don't care about [%s]" - % (service_name,), level=log.UNUSUAL, umid="dIpGNA") - continue - if ann in self._received_announcements: - self.log("ignoring old announcement: %s" % (ann,), - level=log.NOISY) - continue - self.log("new announcement[%s]: %s" % (service_name, ann)) - self._received_announcements.add(ann) - self._new_announcement(ann) - - def _new_announcement(self, announcement): - # this will only be called for new announcements - index = make_index(announcement) - if index in self._connectors: - self.log("replacing earlier announcement", level=log.NOISY) - self._connectors[index].stopConnecting() - rsc = RemoteServiceConnector(announcement, self._tub, self) - self._connectors[index] = rsc - rsc.startConnecting() - - def add_connection(self, nodeid, service_name, rref): - self._connections.add( (nodeid, service_name, rref) ) - self.counter += 1 - # when one connection is established, reset the timers on all others, - # to trigger a reconnection attempt in one second. This is intended - # to accelerate server connections when we've been offline for a - # while. The goal is to avoid hanging out for a long time with - # connections to only a subset of the servers, which would increase - # the chances that we'll put shares in weird places (and not update - # existing shares of mutable files). See #374 for more details. - for rsc in self._connectors.values(): - rsc.reset() - - def remove_connection(self, nodeid, service_name, rref): - self._connections.discard( (nodeid, service_name, rref) ) - self.counter += 1 - - - def get_all_connections(self): - return frozenset(self._connections) - - def get_all_connectors(self): - return self._connectors.copy() - - def get_all_peerids(self): - return frozenset([peerid - for (peerid, service_name, rref) - in self._connections]) - - def get_nickname_for_peerid(self, peerid): - for k in self._connectors: - (peerid0, svcname0) = k - if peerid0 == peerid: - rsc = self._connectors[k] - return rsc.nickname - return None - - def get_all_connections_for(self, service_name): - return frozenset([c - for c in self._connections - if c[1] == service_name]) - - def get_peers(self, service_name): - """Return a set of (peerid, versioned-rref) tuples.""" - return frozenset([(peerid, r) for (peerid, servname, r) in self._connections if servname == service_name]) - - def get_permuted_peers(self, service_name, key): - """Return an ordered list of (peerid, versioned-rref) tuples.""" - - servers = self.get_peers(service_name) - - return sorted(servers, key=lambda x: sha.new(key+x[0]).digest()) + try: + self._process_announcement(ann) + except: + log.err(format="unable to process announcement %(ann)s", + ann=ann) + # Don't let a corrupt announcement prevent us from processing + # the remaining ones. Don't return an error to the server, + # since they'd just ignore it anyways. + pass + + def _process_announcement(self, ann): + self._debug_counts["inbound_announcement"] += 1 + (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann + if service_name not in self._subscribed_service_names: + self.log("announcement for a service we don't care about [%s]" + % (service_name,), level=log.UNUSUAL, umid="dIpGNA") + self._debug_counts["wrong_service"] += 1 + return + self.log("announcement for [%s]: %s" % (service_name, ann), + umid="BoKEag") + assert type(furl) is str + assert type(service_name) is str + assert type(ri_name) is str + assert type(nickname_utf8) is str + nickname = nickname_utf8.decode("utf-8") + assert type(nickname) is unicode + assert type(ver) is str + assert type(oldest) is str + + nodeid = b32decode(SturdyRef(furl).tubID.upper()) + nodeid_s = idlib.shortnodeid_b2a(nodeid) + + ann_d = { "version": 0, + "service-name": service_name, + + "FURL": furl, + "nickname": nickname, + "app-versions": {}, # need #466 and v2 introducer + "my-version": ver, + "oldest-supported": oldest, + } + + index = (service_name, nodeid) + if self._current_announcements.get(index, None) == ann_d: + self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring", + service=service_name, nodeid=nodeid_s, + level=log.UNUSUAL, umid="B1MIdA") + self._debug_counts["duplicate_announcement"] += 1 + return + if index in self._current_announcements: + self._debug_counts["update"] += 1 + else: + self._debug_counts["new_announcement"] += 1 + + self._current_announcements[index] = ann_d + # note: we never forget an index, but we might update its value + + for (service_name2,cb,args,kwargs) in self._local_subscribers: + if service_name2 == service_name: + eventually(cb, nodeid, ann_d, *args, **kwargs) def remote_set_encoding_parameters(self, parameters): self.encoding_parameters = parameters def connected_to_introducer(self): - return self._connected - - def debug_disconnect_from_peerid(self, victim_nodeid): - # for unit tests: locate and sever all connections to the given - # peerid. - for (nodeid, service_name, rref) in self._connections: - if nodeid == victim_nodeid: - rref.tracker.broker.transport.loseConnection() + return bool(self._publisher) diff --git a/src/allmydata/introducer/common.py b/src/allmydata/introducer/common.py deleted file mode 100644 index 54f611a5..00000000 --- a/src/allmydata/introducer/common.py +++ /dev/null @@ -1,11 +0,0 @@ - -import re -from base64 import b32decode - -def make_index(announcement): - (furl, service_name, ri_name, nickname, ver, oldest) = announcement - m = re.match(r'pb://(\w+)@', furl) - assert m - nodeid = b32decode(m.group(1).upper()) - return (nodeid, service_name) - diff --git a/src/allmydata/introducer/interfaces.py b/src/allmydata/introducer/interfaces.py index b02bb355..54f1701f 100644 --- a/src/allmydata/introducer/interfaces.py +++ b/src/allmydata/introducer/interfaces.py @@ -88,53 +88,33 @@ class IIntroducerClient(Interface): parameter: this is supposed to be a globally-unique string that identifies the RemoteInterface that is implemented.""" - def subscribe_to(service_name): + def subscribe_to(service_name, callback, *args, **kwargs): """Call this if you will eventually want to use services with the given SERVICE_NAME. This will prompt me to subscribe to announcements - of those services. You can pick up the announcements later by calling - get_all_connections_for() or get_permuted_peers(). - """ - - def get_all_connections(): - """Return a frozenset of (nodeid, service_name, rref) tuples, one for - each active connection we've established to a remote service. This is - mostly useful for unit tests that need to wait until a certain number - of connections have been made.""" - - def get_all_connectors(): - """Return a dict that maps from (nodeid, service_name) to a - RemoteServiceConnector instance for all services that we are actively - trying to connect to. Each RemoteServiceConnector has the following - public attributes:: - - service_name: the type of service provided, like 'storage' - announcement_time: when we first heard about this service - last_connect_time: when we last established a connection - last_loss_time: when we last lost a connection - - version: the peer's version, from the most recent connection - oldest_supported: the peer's oldest supported version, same - - rref: the RemoteReference, if connected, otherwise None - remote_host: the IAddress, if connected, otherwise None - - This method is intended for monitoring interfaces, such as a web page - which describes connecting and connected peers. - """ - - def get_all_peerids(): - """Return a frozenset of all peerids to whom we have a connection (to - one or more services) established. Mostly useful for unit tests.""" - - def get_all_connections_for(service_name): - """Return a frozenset of (nodeid, service_name, rref) tuples, one - for each active connection that provides the given SERVICE_NAME.""" - - def get_permuted_peers(service_name, key): - """Returns an ordered list of (peerid, rref) tuples, selecting from - the connections that provide SERVICE_NAME, using a hash-based - permutation keyed by KEY. This randomizes the service list in a - repeatable way, to distribute load over many peers. + of those services. Your callback will be invoked with at least two + arguments: a serverid (binary string), and an announcement + dictionary, followed by any additional callback args/kwargs you give + me. I will run your callback for both new announcements and for + announcements that have changed, but you must be prepared to tolerate + duplicates. + + The announcement dictionary that I give you will have the following + keys: + + version: 0 + service-name: str('storage') + + FURL: str(furl) + remoteinterface-name: str(ri_name) + nickname: unicode + app-versions: {} + my-version: str + oldest-supported: str + + Note that app-version will be an empty dictionary until #466 is done + and both the introducer and the remote client have been upgraded. For + current (native) server types, the serverid will always be equal to + the binary form of the FURL's tubid. """ def connected_to_introducer(): diff --git a/src/allmydata/introducer/old.py b/src/allmydata/introducer/old.py index 2f6fa18a..831ddc6c 100644 --- a/src/allmydata/introducer/old.py +++ b/src/allmydata/introducer/old.py @@ -11,7 +11,13 @@ from foolscap.api import Referenceable from allmydata.util import log, idlib from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \ IIntroducerClient, RIIntroducerPublisherAndSubscriberService -from allmydata.introducer.common import make_index + +def make_index(announcement): + (furl, service_name, ri_name, nickname, ver, oldest) = announcement + m = re.match(r'pb://(\w+)@', furl) + assert m + nodeid = b32decode(m.group(1).upper()) + return (nodeid, service_name) class RemoteServiceConnector: """I hold information about a peer service that we want to connect to. If diff --git a/src/allmydata/introducer/server.py b/src/allmydata/introducer/server.py index de511e74..117fcb55 100644 --- a/src/allmydata/introducer/server.py +++ b/src/allmydata/introducer/server.py @@ -1,14 +1,14 @@ import time, os.path +from base64 import b32decode from zope.interface import implements from twisted.application import service -from foolscap.api import Referenceable +from foolscap.api import Referenceable, SturdyRef import allmydata from allmydata import node -from allmydata.util import log +from allmydata.util import log, rrefutil from allmydata.introducer.interfaces import \ RIIntroducerPublisherAndSubscriberService -from allmydata.introducer.common import make_index class IntroducerNode(node.Node): PORTNUMFILE = "introducer.port" @@ -55,9 +55,15 @@ class IntroducerService(service.MultiService, Referenceable): def __init__(self, basedir="."): service.MultiService.__init__(self) self.introducer_url = None - # 'index' is (tubid, service_name) + # 'index' is (service_name, tubid) self._announcements = {} # dict of index -> (announcement, timestamp) self._subscribers = {} # dict of (rref->timestamp) dicts + self._debug_counts = {"inbound_message": 0, + "inbound_duplicate": 0, + "inbound_update": 0, + "outbound_message": 0, + "outbound_announcements": 0, + "inbound_subscribe": 0} def log(self, *args, **kwargs): if "facility" not in kwargs: @@ -73,23 +79,46 @@ class IntroducerService(service.MultiService, Referenceable): return self.VERSION def remote_publish(self, announcement): + try: + self._publish(announcement) + except: + log.err(format="Introducer.remote_publish failed on %(ann)s", + ann=announcement, level=log.UNUSUAL, umid="620rWA") + raise + + def _publish(self, announcement): + self._debug_counts["inbound_message"] += 1 self.log("introducer: announcement published: %s" % (announcement,) ) - index = make_index(announcement) + (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement + + nodeid = b32decode(SturdyRef(furl).tubID.upper()) + index = (service_name, nodeid) + if index in self._announcements: (old_announcement, timestamp) = self._announcements[index] if old_announcement == announcement: self.log("but we already knew it, ignoring", level=log.NOISY) + self._debug_counts["inbound_duplicate"] += 1 return else: self.log("old announcement being updated", level=log.NOISY) + self._debug_counts["inbound_update"] += 1 self._announcements[index] = (announcement, time.time()) - (furl, service_name, ri_name, nickname, ver, oldest) = announcement + for s in self._subscribers.get(service_name, []): - s.callRemote("announce", set([announcement])) + self._debug_counts["outbound_message"] += 1 + self._debug_counts["outbound_announcements"] += 1 + d = s.callRemote("announce", set([announcement])) + d.addErrback(rrefutil.trap_deadref) + d.addErrback(log.err, + format="subscriber errored on announcement %(ann)s", + ann=announcement, facility="tahoe.introducer", + level=log.UNUSUAL, umid="jfGMXQ") def remote_subscribe(self, subscriber, service_name): self.log("introducer: subscription[%s] request at %s" % (service_name, subscriber)) + self._debug_counts["inbound_subscribe"] += 1 if service_name not in self._subscribers: self._subscribers[service_name] = {} subscribers = self._subscribers[service_name] @@ -104,11 +133,16 @@ class IntroducerService(service.MultiService, Referenceable): subscribers.pop(subscriber, None) subscriber.notifyOnDisconnect(_remove) - announcements = set( [ ann - for idx,(ann,when) in self._announcements.items() - if idx[1] == service_name] ) - d = subscriber.callRemote("announce", announcements) - d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL) - - + announcements = set( + [ ann + for (sn2,nodeid),(ann,when) in self._announcements.items() + if sn2 == service_name] ) + self._debug_counts["outbound_message"] += 1 + self._debug_counts["outbound_announcements"] += len(announcements) + d = subscriber.callRemote("announce", announcements) + d.addErrback(rrefutil.trap_deadref) + d.addErrback(log.err, + format="subscriber errored during subscribe %(anns)s", + anns=announcements, facility="tahoe.introducer", + level=log.UNUSUAL, umid="mtZepQ") diff --git a/src/allmydata/node.py b/src/allmydata/node.py index 582c590f..c695bd0d 100644 --- a/src/allmydata/node.py +++ b/src/allmydata/node.py @@ -62,6 +62,7 @@ class Node(service.MultiService): nickname_utf8 = self.get_config("node", "nickname", "") self.nickname = nickname_utf8.decode("utf-8") + assert type(self.nickname) is unicode self.init_tempdir() self.create_tub() diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index eb4a3733..1dfefdd4 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -6,21 +6,50 @@ the foolscap-based server implemented in src/allmydata/storage/*.py . # roadmap: # -# implement ServerFarm, change Client to create it, change -# uploader/servermap to get rrefs from it. ServerFarm calls -# IntroducerClient.subscribe_to . +# 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to +# create it, change uploader/servermap to get rrefs from it. ServerFarm calls +# IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs +# to clients. webapi status pages call broker.get_info_about_serverid. # -# implement NativeStorageClient, change Tahoe2PeerSelector to use it. All -# NativeStorageClients come from the introducer +# 2: move get_info methods to the descriptor, webapi status pages call +# broker.get_descriptor_for_serverid().get_info # -# change web/check_results.py to get NativeStorageClients from check results, -# ask it for a nickname (instead of using client.get_nickname_for_serverid) +# 3?later?: store descriptors in UploadResults/etc instead of serverids, +# webapi status pages call descriptor.get_info and don't use storage_broker +# or Client # -# implement tahoe.cfg scanner, create static NativeStorageClients +# 4: enable static config: tahoe.cfg can add descriptors. Make the introducer +# optional. This closes #467 +# +# 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other +# clients. Clients stop doing callRemote(), use NativeStorageClient methods +# instead (which might do something else, i.e. http or whatever). The +# introducer and tahoe.cfg only create NativeStorageClients for now. +# +# 6: implement other sorts of IStorageClient classes: S3, etc -import sha -from zope.interface import implements +import sha, time +from zope.interface import implements, Interface +from foolscap.api import eventually from allmydata.interfaces import IStorageBroker +from allmydata.util import idlib, log +from allmydata.util.rrefutil import add_version_to_remote_reference + +# who is responsible for de-duplication? +# both? +# IC remembers the unpacked announcements it receives, to provide for late +# subscribers and to remove duplicates + +# if a client subscribes after startup, will they receive old announcements? +# yes + +# who will be responsible for signature checking? +# make it be IntroducerClient, so they can push the filter outwards and +# reduce inbound network traffic + +# what should the interface between StorageFarmBroker and IntroducerClient +# look like? +# don't pass signatures: only pass validated blessed-objects class StorageFarmBroker: implements(IStorageBroker) @@ -30,16 +59,57 @@ class StorageFarmBroker: I'm also responsible for subscribing to the IntroducerClient to find out about new servers as they are announced by the Introducer. """ - def __init__(self, permute_peers=True): + def __init__(self, tub, permute_peers): + self.tub = tub assert permute_peers # False not implemented yet - self.servers = {} # serverid -> StorageClient instance self.permute_peers = permute_peers + # self.descriptors maps serverid -> IServerDescriptor, and keeps + # track of all the storage servers that we've heard about. Each + # descriptor manages its own Reconnector, and will give us a + # RemoteReference when we ask them for it. + self.descriptors = {} + # self.servers are statically configured from unit tests + self.test_servers = {} # serverid -> rref self.introducer_client = None - def add_server(self, serverid, s): - self.servers[serverid] = s + + # these two are used in unit tests + def test_add_server(self, serverid, rref): + self.test_servers[serverid] = rref + def test_add_descriptor(self, serverid, dsc): + self.descriptors[serverid] = dsc + def use_introducer(self, introducer_client): self.introducer_client = ic = introducer_client - ic.subscribe_to("storage") + ic.subscribe_to("storage", self._got_announcement) + + def _got_announcement(self, serverid, ann_d): + assert ann_d["service-name"] == "storage" + old = self.descriptors.get(serverid) + if old: + if old.get_announcement() == ann_d: + return # duplicate + # replacement + del self.descriptors[serverid] + old.stop_connecting() + # now we forget about them and start using the new one + dsc = NativeStorageClientDescriptor(serverid, ann_d) + self.descriptors[serverid] = dsc + dsc.start_connecting(self.tub, self._trigger_connections) + # the descriptor will manage their own Reconnector, and each time we + # need servers, we'll ask them if they're connected or not. + + def _trigger_connections(self): + # when one connection is established, reset the timers on all others, + # to trigger a reconnection attempt in one second. This is intended + # to accelerate server connections when we've been offline for a + # while. The goal is to avoid hanging out for a long time with + # connections to only a subset of the servers, which would increase + # the chances that we'll put shares in weird places (and not update + # existing shares of mutable files). See #374 for more details. + for dsc in self.descriptors.values(): + dsc.try_to_connect() + + def get_servers_for_index(self, peer_selection_index): # first cut: return a list of (peerid, versioned-rref) tuples @@ -51,34 +121,141 @@ class StorageFarmBroker: def get_all_servers(self): # return a frozenset of (peerid, versioned-rref) tuples servers = {} - for serverid,server in self.servers.items(): - servers[serverid] = server - if self.introducer_client: - ic = self.introducer_client - for serverid,server in ic.get_peers("storage"): - servers[serverid] = server + for serverid,rref in self.test_servers.items(): + servers[serverid] = rref + for serverid,dsc in self.descriptors.items(): + rref = dsc.get_rref() + if rref: + servers[serverid] = rref return frozenset(servers.items()) def get_all_serverids(self): - for serverid in self.servers: - yield serverid - if self.introducer_client: - for serverid,server in self.introducer_client.get_peers("storage"): - yield serverid + serverids = set() + serverids.update(self.test_servers.keys()) + serverids.update(self.descriptors.keys()) + return frozenset(serverids) + + def get_all_descriptors(self): + return sorted(self.descriptors.values(), + key=lambda dsc: dsc.get_serverid()) def get_nickname_for_serverid(self, serverid): - if serverid in self.servers: - return self.servers[serverid].nickname - if self.introducer_client: - return self.introducer_client.get_nickname_for_peerid(serverid) + if serverid in self.descriptors: + return self.descriptors[serverid].get_nickname() return None -class NativeStorageClient: - def __init__(self, serverid, furl, nickname, min_shares=1): + +class IServerDescriptor(Interface): + def start_connecting(tub, trigger_cb): + pass + def get_nickname(): + pass + def get_rref(): + pass + +class NativeStorageClientDescriptor: + """I hold information about a storage server that we want to connect to. + If we are connected, I hold the RemoteReference, their host address, and + the their version information. I remember information about when we were + last connected too, even if we aren't currently connected. + + @ivar announcement_time: when we first heard about this service + @ivar last_connect_time: when we last established a connection + @ivar last_loss_time: when we last lost a connection + + @ivar version: the server's versiondict, from the most recent announcement + @ivar nickname: the server's self-reported nickname (unicode), same + + @ivar rref: the RemoteReference, if connected, otherwise None + @ivar remote_host: the IAddress, if connected, otherwise None + """ + implements(IServerDescriptor) + + VERSION_DEFAULTS = { + "http://allmydata.org/tahoe/protocols/storage/v1" : + { "maximum-immutable-share-size": 2**32, + "tolerates-immutable-read-overrun": False, + "delete-mutable-shares-with-zero-length-writev": False, + }, + "application-version": "unknown: no get_version()", + } + + def __init__(self, serverid, ann_d, min_shares=1): self.serverid = serverid - self.furl = furl - self.nickname = nickname + self.announcement = ann_d self.min_shares = min_shares + self.serverid_s = idlib.shortnodeid_b2a(self.serverid) + self.announcement_time = time.time() + self.last_connect_time = None + self.last_loss_time = None + self.remote_host = None + self.rref = None + self._reconnector = None + self._trigger_cb = None + + def get_serverid(self): + return self.serverid + + def get_nickname(self): + return self.announcement["nickname"].decode("utf-8") + def get_announcement(self): + return self.announcement + def get_remote_host(self): + return self.remote_host + def get_last_connect_time(self): + return self.last_connect_time + def get_last_loss_time(self): + return self.last_loss_time + def get_announcement_time(self): + return self.announcement_time + + def start_connecting(self, tub, trigger_cb): + furl = self.announcement["FURL"] + self._trigger_cb = trigger_cb + self._reconnector = tub.connectTo(furl, self._got_connection) + + def _got_connection(self, rref): + lp = log.msg(format="got connection to %(serverid)s, getting versions", + serverid=self.serverid_s, + facility="tahoe.storage_broker", umid="coUECQ") + if self._trigger_cb: + eventually(self._trigger_cb) + default = self.VERSION_DEFAULTS + d = add_version_to_remote_reference(rref, default) + d.addCallback(self._got_versioned_service, lp) + d.addErrback(log.err, format="storageclient._got_connection", + serverid=self.serverid_s, umid="Sdq3pg") + + def _got_versioned_service(self, rref, lp): + log.msg(format="%(serverid)s provided version info %(version)s", + serverid=self.serverid_s, version=rref.version, + facility="tahoe.storage_broker", umid="SWmJYg", + level=log.NOISY, parent=lp) + + self.last_connect_time = time.time() + self.remote_host = rref.getPeer() + self.rref = rref + rref.notifyOnDisconnect(self._lost) + + def get_rref(self): + return self.rref + + def _lost(self): + log.msg(format="lost connection to %(serverid)s", + serverid=self.serverid_s, + facility="tahoe.storage_broker", umid="zbRllw") + self.last_loss_time = time.time() + self.rref = None + self.remote_host = None + + def stop_connecting(self): + # used when this descriptor has been superceded by another + self._reconnector.stopConnecting() + + def try_to_connect(self): + # used when the broker wants us to hurry up + self._reconnector.reset() + class UnknownServerTypeError(Exception): pass diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index ee5c2650..6140e841 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -533,10 +533,10 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): def _check_connections(self): for c in self.clients: - ic = c.introducer_client - if not ic.connected_to_introducer(): + if not c.connected_to_introducer(): return False - if len(ic.get_all_peerids()) != self.numclients: + sb = c.get_storage_broker() + if len(sb.get_all_servers()) != self.numclients: return False return True diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index 8af6d1b8..eeb14fa3 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -25,7 +25,6 @@ from allmydata import uri as tahoe_uri from allmydata.client import Client from allmydata.storage.server import StorageServer, storage_index_to_dir from allmydata.util import fileutil, idlib, hashutil -from allmydata.introducer.client import RemoteServiceConnector from allmydata.test.common_web import HTTPClientGETFactory from allmydata.interfaces import IStorageBroker @@ -93,17 +92,13 @@ class LocalWrapper: def dontNotifyOnDisconnect(self, marker): del self.disconnectors[marker] -def wrap(original, service_name): +def wrap_storage_server(original): # Much of the upload/download code uses rref.version (which normally # comes from rrefutil.add_version_to_remote_reference). To avoid using a # network, we want a LocalWrapper here. Try to satisfy all these # constraints at the same time. wrapper = LocalWrapper(original) - try: - version = original.remote_get_version() - except AttributeError: - version = RemoteServiceConnector.VERSION_DEFAULTS[service_name] - wrapper.version = version + wrapper.version = original.remote_get_version() return wrapper class NoNetworkStorageBroker: @@ -220,7 +215,7 @@ class NoNetworkGrid(service.MultiService): ss.setServiceParent(middleman) serverid = ss.my_nodeid self.servers_by_number[i] = ss - self.servers_by_id[serverid] = wrap(ss, "storage") + self.servers_by_id[serverid] = wrap_storage_server(ss) self.all_servers = frozenset(self.servers_by_id.items()) for c in self.clients: c._servers = self.all_servers diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index 88f74951..e52e59b2 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -3,7 +3,7 @@ import simplejson from twisted.trial import unittest from allmydata import check_results, uri from allmydata.web import check_results as web_check_results -from allmydata.storage_client import StorageFarmBroker, NativeStorageClient +from allmydata.storage_client import StorageFarmBroker, NativeStorageClientDescriptor from common_web import WebRenderingMixin class FakeClient: @@ -13,12 +13,20 @@ class FakeClient: class WebResultsRendering(unittest.TestCase, WebRenderingMixin): def create_fake_client(self): - sb = StorageFarmBroker() + sb = StorageFarmBroker(None, True) for (peerid, nickname) in [("\x00"*20, "peer-0"), ("\xff"*20, "peer-f"), ("\x11"*20, "peer-11")] : - n = NativeStorageClient(peerid, None, nickname) - sb.add_server(peerid, n) + ann_d = { "version": 0, + "service-name": "storage", + "FURL": "fake furl", + "nickname": unicode(nickname), + "app-versions": {}, # need #466 and v2 introducer + "my-version": "ver", + "oldest-supported": "oldest", + } + dsc = NativeStorageClientDescriptor(peerid, ann_d) + sb.test_add_descriptor(peerid, dsc) c = FakeClient() c.storage_broker = sb return c diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index 63f4962f..6040e250 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -146,13 +146,13 @@ class Basic(unittest.TestCase): for (peerid,rref) in sb.get_servers_for_index(key) ] def test_permute(self): - sb = StorageFarmBroker() + sb = StorageFarmBroker(None, True) for k in ["%d" % i for i in range(5)]: - sb.add_server(k, None) + sb.test_add_server(k, None) self.failUnlessEqual(self._permute(sb, "one"), ['3','1','0','4','2']) self.failUnlessEqual(self._permute(sb, "two"), ['0','4','2','1','3']) - sb.servers = {} + sb.test_servers.clear() self.failUnlessEqual(self._permute(sb, "one"), []) def test_versions(self): diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index d2e689c0..113893b8 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -63,7 +63,7 @@ class FakeClient(service.MultiService): "max_segment_size": 1*MiB, } stats_provider = None - storage_broker = StorageFarmBroker() + storage_broker = StorageFarmBroker(None, True) def log(self, *args, **kwargs): return log.msg(*args, **kwargs) def get_encoding_parameters(self): diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index a42e0c1a..d5c59e09 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -11,16 +11,12 @@ from twisted.application import service from allmydata.interfaces import InsufficientVersionError from allmydata.introducer.client import IntroducerClient from allmydata.introducer.server import IntroducerService -from allmydata.introducer.common import make_index # test compatibility with old introducer .tac files from allmydata.introducer import IntroducerNode from allmydata.introducer import old -from allmydata.util import idlib, pollmixin +from allmydata.util import pollmixin import common_util as testutil -class FakeNode(Referenceable): - pass - class LoggingMultiService(service.MultiService): def log(self, msg, **kw): log.msg(msg, **kw) @@ -51,7 +47,7 @@ class ServiceMixin: class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): def test_create(self): - ic = IntroducerClient(None, "introducer.furl", "my_nickname", + ic = IntroducerClient(None, "introducer.furl", u"my_nickname", "my_version", "oldest_version") def test_listen(self): @@ -79,33 +75,35 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): - def setUp(self): - ServiceMixin.setUp(self) - self.central_tub = tub = Tub() + def create_tub(self, portnum=0): + tubfile = os.path.join(self.basedir, "tub.pem") + self.central_tub = tub = Tub(certFile=tubfile) #tub.setOption("logLocalFailures", True) #tub.setOption("logRemoteFailures", True) tub.setOption("expose-remote-exception-types", False) tub.setServiceParent(self.parent) - l = tub.listenOn("tcp:0") - portnum = l.getPortnum() - tub.setLocation("localhost:%d" % portnum) + l = tub.listenOn("tcp:%d" % portnum) + self.central_portnum = l.getPortnum() + if portnum != 0: + assert self.central_portnum == portnum + tub.setLocation("localhost:%d" % self.central_portnum) class SystemTest(SystemTestMixin, unittest.TestCase): def test_system(self): - i = IntroducerService() - i.setServiceParent(self.parent) - self.introducer_furl = self.central_tub.registerReference(i) - return self.do_system_test() + self.basedir = "introducer/SystemTest/system" + os.makedirs(self.basedir) + return self.do_system_test(IntroducerService) test_system.timeout = 480 # occasionally takes longer than 350s on "draco" - def test_system_oldserver(self): - i = old.IntroducerService_V1() - i.setServiceParent(self.parent) - self.introducer_furl = self.central_tub.registerReference(i) - return self.do_system_test() - - def do_system_test(self): + def do_system_test(self, create_introducer): + self.create_tub() + introducer = create_introducer() + introducer.setServiceParent(self.parent) + iff = os.path.join(self.basedir, "introducer.furl") + tub = self.central_tub + ifurl = self.central_tub.registerReference(introducer, furlFile=iff) + self.introducer_furl = ifurl NUMCLIENTS = 5 # we have 5 clients who publish themselves, and an extra one does @@ -114,6 +112,11 @@ class SystemTest(SystemTestMixin, unittest.TestCase): clients = [] tubs = {} + received_announcements = {} + NUM_SERVERS = NUMCLIENTS + subscribing_clients = [] + publishing_clients = [] + for i in range(NUMCLIENTS+1): tub = Tub() #tub.setOption("logLocalFailures", True) @@ -124,115 +127,210 @@ class SystemTest(SystemTestMixin, unittest.TestCase): portnum = l.getPortnum() tub.setLocation("localhost:%d" % portnum) - n = FakeNode() log.msg("creating client %d: %s" % (i, tub.getShortTubID())) - client_class = IntroducerClient - if i == 0: - client_class = old.IntroducerClient_V1 - c = client_class(tub, self.introducer_furl, - "nickname-%d" % i, "version", "oldest") + c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i, + "version", "oldest") + received_announcements[c] = ra = {} + def got(serverid, ann_d, announcements): + announcements[serverid] = ann_d + c.subscribe_to("storage", got, received_announcements[c]) + subscribing_clients.append(c) + if i < NUMCLIENTS: - node_furl = tub.registerReference(n) + node_furl = tub.registerReference(Referenceable()) c.publish(node_furl, "storage", "ri_name") + publishing_clients.append(c) # the last one does not publish anything - c.subscribe_to("storage") - c.setServiceParent(self.parent) clients.append(c) tubs[c] = tub def _wait_for_all_connections(): - for c in clients: - if len(c.get_all_connections()) < NUMCLIENTS: + for c in subscribing_clients: + if len(received_announcements[c]) < NUM_SERVERS: return False return True d = self.poll(_wait_for_all_connections) def _check1(res): log.msg("doing _check1") + dc = introducer._debug_counts + self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS) + self.failUnlessEqual(dc["inbound_duplicate"], 0) + self.failUnlessEqual(dc["inbound_update"], 0) + self.failUnless(dc["outbound_message"]) + for c in clients: self.failUnless(c.connected_to_introducer()) - self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS) - self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS) - self.failUnlessEqual(len(c.get_all_connections_for("storage")), - NUMCLIENTS) + for c in subscribing_clients: + cdc = c._debug_counts + self.failUnless(cdc["inbound_message"]) + self.failUnlessEqual(cdc["inbound_announcement"], + NUM_SERVERS) + self.failUnlessEqual(cdc["wrong_service"], 0) + self.failUnlessEqual(cdc["duplicate_announcement"], 0) + self.failUnlessEqual(cdc["update"], 0) + self.failUnlessEqual(cdc["new_announcement"], + NUM_SERVERS) + anns = received_announcements[c] + self.failUnlessEqual(len(anns), NUM_SERVERS) + nodeid0 = b32decode(tubs[clients[0]].tubID.upper()) - self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0), - "nickname-0") + ann_d = anns[nodeid0] + nick = ann_d["nickname"] + self.failUnlessEqual(type(nick), unicode) + self.failUnlessEqual(nick, u"nickname-0") + for c in publishing_clients: + cdc = c._debug_counts + self.failUnlessEqual(cdc["outbound_message"], 1) d.addCallback(_check1) - origin_c = clients[0] - def _disconnect_somebody_else(res): - # now disconnect somebody's connection to someone else - current_counter = origin_c.counter - victim_nodeid = b32decode(tubs[clients[1]].tubID.upper()) - log.msg(" disconnecting %s->%s" % - (tubs[origin_c].tubID, - idlib.shortnodeid_b2a(victim_nodeid))) - origin_c.debug_disconnect_from_peerid(victim_nodeid) - log.msg(" did disconnect") - - # then wait until something changes, which ought to be them - # noticing the loss - def _compare(): - return current_counter != origin_c.counter - return self.poll(_compare) - - d.addCallback(_disconnect_somebody_else) - - # and wait for them to reconnect - d.addCallback(lambda res: self.poll(_wait_for_all_connections)) + # force an introducer reconnect, by shutting down the Tub it's using + # and starting a new Tub (with the old introducer). Everybody should + # reconnect and republish, but the introducer should ignore the + # republishes as duplicates. However, because the server doesn't know + # what each client does and does not know, it will send them a copy + # of the current announcement table anyway. + + d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub")) + d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) + + def _wait_for_introducer_loss(): + for c in clients: + if c.connected_to_introducer(): + return False + return True + d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) + + def _restart_introducer_tub(_ign): + log.msg("restarting introducer's Tub") + + # note: old.Server doesn't have this count + dc = introducer._debug_counts + self.expected_count = dc["inbound_message"] + NUM_SERVERS + self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1 + introducer._debug0 = dc["outbound_message"] + for c in subscribing_clients: + cdc = c._debug_counts + c._debug0 = cdc["inbound_message"] + + self.create_tub(self.central_portnum) + newfurl = self.central_tub.registerReference(introducer, + furlFile=iff) + assert newfurl == self.introducer_furl + d.addCallback(_restart_introducer_tub) + + def _wait_for_introducer_reconnect(): + # wait until: + # all clients are connected + # the introducer has received publish messages from all of them + # the introducer has received subscribe messages from all of them + # the introducer has sent (duplicate) announcements to all of them + # all clients have received (duplicate) announcements + dc = introducer._debug_counts + for c in clients: + if not c.connected_to_introducer(): + return False + if dc["inbound_message"] < self.expected_count: + return False + if dc["inbound_subscribe"] < self.expected_subscribe_count: + return False + for c in subscribing_clients: + cdc = c._debug_counts + if cdc["inbound_message"] < c._debug0+1: + return False + return True + d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect)) + def _check2(res): log.msg("doing _check2") + # assert that the introducer sent out new messages, one per + # subscriber + dc = introducer._debug_counts + self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS) + self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS) + self.failUnlessEqual(dc["inbound_update"], 0) + self.failUnlessEqual(dc["outbound_message"], + introducer._debug0 + len(subscribing_clients)) for c in clients: - self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS) + self.failUnless(c.connected_to_introducer()) + for c in subscribing_clients: + cdc = c._debug_counts + self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS) d.addCallback(_check2) - def _disconnect_yourself(res): - # now disconnect somebody's connection to themselves. - current_counter = origin_c.counter - victim_nodeid = b32decode(tubs[clients[0]].tubID.upper()) - log.msg(" disconnecting %s->%s" % - (tubs[origin_c].tubID, - idlib.shortnodeid_b2a(victim_nodeid))) - origin_c.debug_disconnect_from_peerid(victim_nodeid) - log.msg(" did disconnect from self") - - def _compare(): - return current_counter != origin_c.counter - return self.poll(_compare) - d.addCallback(_disconnect_yourself) - - d.addCallback(lambda res: self.poll(_wait_for_all_connections)) - def _check3(res): - log.msg("doing _check3") - for c in clients: - self.failUnlessEqual(len(c.get_all_connections_for("storage")), - NUMCLIENTS) - d.addCallback(_check3) - def _shutdown_introducer(res): - # now shut down the introducer. We do this by shutting down the - # tub it's using. Nobody's connections (to each other) should go - # down. All clients should notice the loss, and no other errors - # should occur. - log.msg("shutting down the introducer") - return self.central_tub.disownServiceParent() - d.addCallback(_shutdown_introducer) - def _wait_for_introducer_loss(): + # Then force an introducer restart, by shutting down the Tub, + # destroying the old introducer, and starting a new Tub+Introducer. + # Everybody should reconnect and republish, and the (new) introducer + # will distribute the new announcements, but the clients should + # ignore the republishes as duplicates. + + d.addCallback(lambda _ign: log.msg("shutting down introducer")) + d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) + d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) + + def _restart_introducer(_ign): + log.msg("restarting introducer") + self.create_tub(self.central_portnum) + + for c in subscribing_clients: + # record some counters for later comparison. Stash the values + # on the client itself, because I'm lazy. + cdc = c._debug_counts + c._debug1 = cdc["inbound_announcement"] + c._debug2 = cdc["inbound_message"] + c._debug3 = cdc["new_announcement"] + newintroducer = create_introducer() + self.expected_message_count = NUM_SERVERS + self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients) + self.expected_subscribe_count = len(subscribing_clients) + newfurl = self.central_tub.registerReference(newintroducer, + furlFile=iff) + assert newfurl == self.introducer_furl + d.addCallback(_restart_introducer) + def _wait_for_introducer_reconnect2(): + # wait until: + # all clients are connected + # the introducer has received publish messages from all of them + # the introducer has received subscribe messages from all of them + # the introducer has sent announcements for everybody to everybody + # all clients have received all the (duplicate) announcements + # at that point, the system should be quiescent + dc = introducer._debug_counts for c in clients: - if c.connected_to_introducer(): + if not c.connected_to_introducer(): + return False + if dc["inbound_message"] < self.expected_message_count: + return False + if dc["outbound_announcements"] < self.expected_announcement_count: + return False + if dc["inbound_subscribe"] < self.expected_subscribe_count: + return False + for c in subscribing_clients: + cdc = c._debug_counts + if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS: return False return True - d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) + d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2)) - def _check4(res): - log.msg("doing _check4") + def _check3(res): + log.msg("doing _check3") for c in clients: - self.failUnlessEqual(len(c.get_all_connections_for("storage")), - NUMCLIENTS) - self.failIf(c.connected_to_introducer()) - d.addCallback(_check4) + self.failUnless(c.connected_to_introducer()) + for c in subscribing_clients: + cdc = c._debug_counts + self.failUnless(cdc["inbound_announcement"] > c._debug1) + self.failUnless(cdc["inbound_message"] > c._debug2) + # there should have been no new announcements + self.failUnlessEqual(cdc["new_announcement"], c._debug3) + # and the right number of duplicate ones. There were + # NUM_SERVERS from the servertub restart, and there should be + # another NUM_SERVERS now + self.failUnlessEqual(cdc["duplicate_announcement"], + 2*NUM_SERVERS) + + d.addCallback(_check3) return d class TooNewServer(IntroducerService): @@ -247,6 +345,9 @@ class NonV1Server(SystemTestMixin, unittest.TestCase): # exception. def test_failure(self): + self.basedir = "introducer/NonV1Server/failure" + os.makedirs(self.basedir) + self.create_tub() i = TooNewServer() i.setServiceParent(self.parent) self.introducer_furl = self.central_tub.registerReference(i) @@ -258,10 +359,12 @@ class NonV1Server(SystemTestMixin, unittest.TestCase): portnum = l.getPortnum() tub.setLocation("localhost:%d" % portnum) - n = FakeNode() c = IntroducerClient(tub, self.introducer_furl, - "nickname-client", "version", "oldest") - c.subscribe_to("storage") + u"nickname-client", "version", "oldest") + announcements = {} + def got(serverid, ann_d): + announcements[serverid] = ann_d + c.subscribe_to("storage", got) c.setServiceParent(self.parent) @@ -283,7 +386,7 @@ class Index(unittest.TestCase): ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i', 'storage', 'RIStorageServer.tahoe.allmydata.com', 'plancha', 'allmydata-tahoe/1.4.1', '1.0.0') - (nodeid, service_name) = make_index(ann) + (nodeid, service_name) = old.make_index(ann) self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d") self.failUnlessEqual(service_name, "storage") diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 23bc6761..36064b2f 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -174,19 +174,19 @@ class FakeClient: peerids = [tagged_hash("peerid", "%d" % i)[:20] for i in range(self._num_peers)] self.nodeid = "fakenodeid" - self.storage_broker = StorageFarmBroker() + self.storage_broker = StorageFarmBroker(None, True) for peerid in peerids: fss = FakeStorageServer(peerid, self._storage) - self.storage_broker.add_server(peerid, fss) + self.storage_broker.test_add_server(peerid, fss) def get_storage_broker(self): return self.storage_broker def debug_break_connection(self, peerid): - self.storage_broker.servers[peerid].broken = True + self.storage_broker.test_servers[peerid].broken = True def debug_remove_connection(self, peerid): - self.storage_broker.servers.pop(peerid) + self.storage_broker.test_servers.pop(peerid) def debug_get_connection(self, peerid): - return self.storage_broker.servers[peerid] + return self.storage_broker.test_servers[peerid] def get_encoding_parameters(self): return {"k": 3, "n": 10} @@ -1569,7 +1569,7 @@ class MultipleEncodings(unittest.TestCase): sharemap = {} sb = self._client.get_storage_broker() - for i,peerid in enumerate(sb.get_all_serverids()): + for peerid in sorted(sb.get_all_serverids()): peerid_s = shortnodeid_b2a(peerid) for shnum in self._shares1.get(peerid, {}): if shnum < len(places): @@ -1794,13 +1794,13 @@ class LessFakeClient(FakeClient): self._num_peers = num_peers peerids = [tagged_hash("peerid", "%d" % i)[:20] for i in range(self._num_peers)] - self.storage_broker = StorageFarmBroker() + self.storage_broker = StorageFarmBroker(None, True) for peerid in peerids: peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid)) make_dirs(peerdir) ss = StorageServer(peerdir, peerid) lw = LocalWrapper(ss) - self.storage_broker.add_server(peerid, lw) + self.storage_broker.test_add_server(peerid, lw) self.nodeid = "fakenodeid" diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 861a2007..9a7f4698 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -73,10 +73,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase): def _check(extra_node): self.extra_node = extra_node for c in self.clients: - all_peerids = list(c.get_storage_broker().get_all_serverids()) + all_peerids = c.get_storage_broker().get_all_serverids() self.failUnlessEqual(len(all_peerids), self.numclients+1) sb = c.storage_broker - permuted_peers = list(sb.get_servers_for_index("a")) + permuted_peers = sb.get_servers_for_index("a") self.failUnlessEqual(len(permuted_peers), self.numclients+1) d.addCallback(_check) @@ -108,10 +108,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase): d = self.set_up_nodes() def _check_connections(res): for c in self.clients: - all_peerids = list(c.get_storage_broker().get_all_serverids()) + all_peerids = c.get_storage_broker().get_all_serverids() self.failUnlessEqual(len(all_peerids), self.numclients) sb = c.storage_broker - permuted_peers = list(sb.get_servers_for_index("a")) + permuted_peers = sb.get_servers_for_index("a") self.failUnlessEqual(len(permuted_peers), self.numclients) d.addCallback(_check_connections) diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 487ba0a1..ffeb4ee7 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -173,9 +173,9 @@ class FakeClient: else: peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),) for fakeid in range(self.num_servers) ] - self.storage_broker = StorageFarmBroker() + self.storage_broker = StorageFarmBroker(None, permute_peers=True) for (serverid, server) in peers: - self.storage_broker.add_server(serverid, server) + self.storage_broker.test_add_server(serverid, server) self.last_peers = [p[1] for p in peers] def log(self, *args, **kwargs): diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 846c29b4..49c4e29f 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -31,14 +31,6 @@ from allmydata.test.common_web import HTTPClientGETFactory, \ timeout = 480 # Most of these take longer than 240 seconds on Francois's arm box. -class FakeIntroducerClient: - def get_all_connectors(self): - return {} - def get_all_connections_for(self, service_name): - return frozenset() - def get_all_peerids(self): - return frozenset() - class FakeStatsProvider: def get_stats(self): stats = {'stats': {}, 'counters': {}} @@ -55,7 +47,7 @@ class FakeClient(service.MultiService): 'zfec': "fake", } introducer_furl = "None" - introducer_client = FakeIntroducerClient() + _all_upload_status = [upload.UploadStatus()] _all_download_status = [download.DownloadStatus()] _all_mapupdate_statuses = [servermap.UpdateStatus()] @@ -67,7 +59,7 @@ class FakeClient(service.MultiService): def connected_to_introducer(self): return False - storage_broker = StorageFarmBroker() + storage_broker = StorageFarmBroker(None, permute_peers=True) def get_storage_broker(self): return self.storage_broker diff --git a/src/allmydata/web/root.py b/src/allmydata/web/root.py index 910847ca..6fa84b6a 100644 --- a/src/allmydata/web/root.py +++ b/src/allmydata/web/root.py @@ -238,30 +238,24 @@ class Root(rend.Page): return "no" def data_known_storage_servers(self, ctx, data): - ic = self.client.introducer_client - servers = [c - for c in ic.get_all_connectors().values() - if c.service_name == "storage"] - return len(servers) + sb = self.client.get_storage_broker() + return len(sb.get_all_serverids()) def data_connected_storage_servers(self, ctx, data): - ic = self.client.introducer_client - return len(ic.get_all_connections_for("storage")) + sb = self.client.get_storage_broker() + return len(sb.get_all_servers()) def data_services(self, ctx, data): - ic = self.client.introducer_client - c = [ (service_name, nodeid, rsc) - for (nodeid, service_name), rsc - in ic.get_all_connectors().items() ] - c.sort() - return c - - def render_service_row(self, ctx, data): - (service_name, nodeid, rsc) = data + sb = self.client.get_storage_broker() + return sb.get_all_descriptors() + + def render_service_row(self, ctx, descriptor): + nodeid = descriptor.get_serverid() + ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid)) - ctx.fillSlots("nickname", rsc.nickname) - if rsc.rref: - rhost = rsc.remote_host + ctx.fillSlots("nickname", descriptor.get_nickname()) + rhost = descriptor.get_remote_host() + if rhost: if nodeid == self.client.nodeid: rhost_s = "(loopback)" elif isinstance(rhost, address.IPv4Address): @@ -269,19 +263,24 @@ class Root(rend.Page): else: rhost_s = str(rhost) connected = "Yes: to " + rhost_s - since = rsc.last_connect_time + since = descriptor.get_last_connect_time() else: connected = "No" - since = rsc.last_loss_time + since = descriptor.get_last_loss_time() + announced = descriptor.get_announcement_time() + announcement = descriptor.get_announcement() + version = announcement["version"] + service_name = announcement["service-name"] TIME_FORMAT = "%H:%M:%S %d-%b-%Y" ctx.fillSlots("connected", connected) - ctx.fillSlots("connected-bool", not not rsc.rref) - ctx.fillSlots("since", time.strftime(TIME_FORMAT, time.localtime(since))) + ctx.fillSlots("connected-bool", bool(rhost)) + ctx.fillSlots("since", time.strftime(TIME_FORMAT, + time.localtime(since))) ctx.fillSlots("announced", time.strftime(TIME_FORMAT, - time.localtime(rsc.announcement_time))) - ctx.fillSlots("version", rsc.version) - ctx.fillSlots("service_name", rsc.service_name) + time.localtime(announced))) + ctx.fillSlots("version", version) + ctx.fillSlots("service_name", service_name) return ctx.tag