from twisted.internet import reactor
from twisted.application.internet import TimerService
from foolscap.api import Referenceable
-from foolscap.logging import log
from pycryptopp.publickey import rsa
import allmydata
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, pollmixin, cachedir
+from allmydata.util import hashutil, base32, pollmixin, cachedir, log
from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date
from allmydata.uri import LiteralFileURI
d = self.when_tub_ready()
def _start_introducer_client(res):
ic.setServiceParent(self)
- # nodes that want to upload and download will need storage servers
- ic.subscribe_to("storage")
d.addCallback(_start_introducer_client)
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="URyI5w")
def init_client_storage_broker(self):
# create a StorageFarmBroker object, for use by Uploader/Downloader
# (and everybody else who wants to use storage servers)
- self.storage_broker = sb = storage_client.StorageFarmBroker()
+ sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
+ self.storage_broker = sb
- # load static server specifications from tahoe.cfg, if any
+ # load static server specifications from tahoe.cfg, if any.
+ # Not quite ready yet.
#if self.config.has_section("client-server-selection"):
# server_params = {} # maps serverid to dict of parameters
# for (name, value) in self.config.items("client-server-selection"):
temporary test network and need to know when it is safe to proceed
with an upload or download."""
def _check():
- current_clients = list(self.storage_broker.get_all_serverids())
- return len(current_clients) >= num_clients
+ return len(self.storage_broker.get_all_servers()) >= num_clients
d = self.poll(_check, 0.5)
d.addCallback(lambda res: None)
return d
# phase to take more than 10 seconds. Expect worst-case latency to be
# 300ms.
results = {}
- conns = self.parent.introducer_client.get_all_connections_for("storage")
- everyone = [(peerid,rref) for (peerid, service_name, rref) in conns]
+ sb = self.parent.get_storage_broker()
+ everyone = sb.get_all_servers()
num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
- everyone = everyone * num_pings
+ everyone = list(everyone) * num_pings
d = self._do_one_ping(None, everyone, results)
return d
def _do_one_ping(self, res, everyone_left, results):
from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
from allmydata.util.assertutil import _assert, precondition
from allmydata import codec, hashtree, uri
-from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
+from allmydata.interfaces import IDownloadTarget, IDownloader, \
+ IFileURI, IVerifierURI, \
IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
- IStorageBroker, NotEnoughSharesError, \
+ IStorageBroker, NotEnoughSharesError, NoServersError, \
UnableToFetchCriticalDownloadDataError
from allmydata.immutable import layout
from allmydata.monitor import Monitor
def _get_all_shareholders(self):
dl = []
sb = self._storage_broker
- for (peerid,ss) in sb.get_servers_for_index(self._storage_index):
+ servers = sb.get_servers_for_index(self._storage_index)
+ if not servers:
+ raise NoServersError("broker gave us no servers!")
+ for (peerid,ss) in servers:
self.log(format="sending DYHB to [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, umid="rT03hg")
"""
def get_all_serverids():
"""
- @return: iterator of serverid strings
+ @return: frozenset of serverid strings
"""
def get_nickname_for_serverid(serverid):
"""
@return: unicode nickname, or None
"""
+ # methods moved from IntroducerClient, need review
+ def get_all_connections():
+ """Return a frozenset of (nodeid, service_name, rref) tuples, one for
+ each active connection we've established to a remote service. This is
+ mostly useful for unit tests that need to wait until a certain number
+ of connections have been made."""
+
+ def get_all_connectors():
+ """Return a dict that maps from (nodeid, service_name) to a
+ RemoteServiceConnector instance for all services that we are actively
+ trying to connect to. Each RemoteServiceConnector has the following
+ public attributes::
+
+ service_name: the type of service provided, like 'storage'
+ announcement_time: when we first heard about this service
+ last_connect_time: when we last established a connection
+ last_loss_time: when we last lost a connection
+
+ version: the peer's version, from the most recent connection
+ oldest_supported: the peer's oldest supported version, same
+
+ rref: the RemoteReference, if connected, otherwise None
+ remote_host: the IAddress, if connected, otherwise None
+
+ This method is intended for monitoring interfaces, such as a web page
+ which describes connecting and connected peers.
+ """
+
+ def get_all_peerids():
+ """Return a frozenset of all peerids to whom we have a connection (to
+ one or more services) established. Mostly useful for unit tests."""
+
+ def get_all_connections_for(service_name):
+ """Return a frozenset of (nodeid, service_name, rref) tuples, one
+ for each active connection that provides the given SERVICE_NAME."""
+
+ def get_permuted_peers(service_name, key):
+ """Returns an ordered list of (peerid, rref) tuples, selecting from
+ the connections that provide SERVICE_NAME, using a hash-based
+ permutation keyed by KEY. This randomizes the service list in a
+ repeatable way, to distribute load over many peers.
+ """
+
# hm, we need a solution for forward references in schemas
FileNode_ = Any() # TODO: foolscap needs constraints on copyables
-import re, time, sha
from base64 import b32decode
from zope.interface import implements
from twisted.application import service
-from foolscap.api import Referenceable
+from foolscap.api import Referenceable, SturdyRef, eventually
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
IIntroducerClient
from allmydata.util import log, idlib
-from allmydata.util.rrefutil import add_version_to_remote_reference
-from allmydata.introducer.common import make_index
-
-
-class RemoteServiceConnector:
- """I hold information about a peer service that we want to connect to. If
- we are connected, I hold the RemoteReference, the peer's address, and the
- peer's version information. I remember information about when we were
- last connected to the peer too, even if we aren't currently connected.
-
- @ivar announcement_time: when we first heard about this service
- @ivar last_connect_time: when we last established a connection
- @ivar last_loss_time: when we last lost a connection
-
- @ivar version: the peer's version, from the most recent announcement
- @ivar oldest_supported: the peer's oldest supported version, same
- @ivar nickname: the peer's self-reported nickname, same
-
- @ivar rref: the RemoteReference, if connected, otherwise None
- @ivar remote_host: the IAddress, if connected, otherwise None
- """
-
- VERSION_DEFAULTS = {
- "storage": { "http://allmydata.org/tahoe/protocols/storage/v1" :
- { "maximum-immutable-share-size": 2**32,
- "tolerates-immutable-read-overrun": False,
- "delete-mutable-shares-with-zero-length-writev": False,
- },
- "application-version": "unknown: no get_version()",
- },
- "stub_client": { },
- }
-
- def __init__(self, announcement, tub, ic):
- self._tub = tub
- self._announcement = announcement
- self._ic = ic
- (furl, service_name, ri_name, nickname, ver, oldest) = announcement
-
- self._furl = furl
- m = re.match(r'pb://(\w+)@', furl)
- assert m
- self._nodeid = b32decode(m.group(1).upper())
- self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
-
- self.service_name = service_name
-
- self.log("attempting to connect to %s" % self._nodeid_s)
- self.announcement_time = time.time()
- self.last_loss_time = None
- self.rref = None
- self.remote_host = None
- self.last_connect_time = None
- self.version = ver
- self.oldest_supported = oldest
- self.nickname = nickname
-
- def log(self, *args, **kwargs):
- return self._ic.log(*args, **kwargs)
-
- def startConnecting(self):
- self._reconnector = self._tub.connectTo(self._furl, self._got_service)
-
- def stopConnecting(self):
- self._reconnector.stopConnecting()
-
- def _got_service(self, rref):
- self.log("got connection to %s, getting versions" % self._nodeid_s)
-
- default = self.VERSION_DEFAULTS.get(self.service_name, {})
- d = add_version_to_remote_reference(rref, default)
- d.addCallback(self._got_versioned_service)
-
- def _got_versioned_service(self, rref):
- self.log("connected to %s, version %s" % (self._nodeid_s, rref.version))
-
- self.last_connect_time = time.time()
- self.remote_host = rref.tracker.broker.transport.getPeer()
-
- self.rref = rref
-
- self._ic.add_connection(self._nodeid, self.service_name, rref)
-
- rref.notifyOnDisconnect(self._lost, rref)
-
- def _lost(self, rref):
- self.log("lost connection to %s" % self._nodeid_s)
- self.last_loss_time = time.time()
- self.rref = None
- self.remote_host = None
- self._ic.remove_connection(self._nodeid, self.service_name, rref)
-
-
- def reset(self):
- self._reconnector.reset()
+from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
class IntroducerClient(service.Service, Referenceable):
self._tub = tub
self.introducer_furl = introducer_furl
- self._nickname = nickname.encode("utf-8")
+ assert type(nickname) is unicode
+ self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
self._my_version = my_version
self._oldest_supported = oldest_supported
self._published_announcements = set()
self._publisher = None
- self._connected = False
+ self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
self._subscribed_service_names = set()
self._subscriptions = set() # requests we've actually sent
- self._received_announcements = set()
- # TODO: this set will grow without bound, until the node is restarted
-
- # we only accept one announcement per (peerid+service_name) pair.
- # This insures that an upgraded host replace their previous
- # announcement. It also means that each peer must have their own Tub
- # (no sharing), which is slightly weird but consistent with the rest
- # of the Tahoe codebase.
- self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
- # self._connections is a set of (peerid, service_name, rref) tuples
- self._connections = set()
-
- self.counter = 0 # incremented each time we change state, for tests
+
+ # _current_announcements remembers one announcement per
+ # (servicename,serverid) pair. Anything that arrives with the same
+ # pair will displace the previous one. This stores unpacked
+ # announcement dictionaries, which can be compared for equality to
+ # distinguish re-announcement from updates. It also provides memory
+ # for clients who subscribe after startup.
+ self._current_announcements = {}
+
self.encoding_parameters = None
+ # hooks for unit tests
+ self._debug_counts = {
+ "inbound_message": 0,
+ "inbound_announcement": 0,
+ "wrong_service": 0,
+ "duplicate_announcement": 0,
+ "update": 0,
+ "new_announcement": 0,
+ "outbound_message": 0,
+ }
+
def startService(self):
service.Service.startService(self)
self._introducer_error = None
needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
if needed not in publisher.version:
raise InsufficientVersionError(needed, publisher.version)
- self._connected = True
self._publisher = publisher
publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish()
def _disconnected(self):
self.log("bummer, we've lost our connection to the introducer")
- self._connected = False
self._publisher = None
self._subscriptions.clear()
- def stopService(self):
- service.Service.stopService(self)
- self._introducer_reconnector.stopConnecting()
- for rsc in self._connectors.itervalues():
- rsc.stopConnecting()
-
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
def publish(self, furl, service_name, remoteinterface_name):
+ assert type(self._nickname_utf8) is str # we always send UTF-8
ann = (furl, service_name, remoteinterface_name,
- self._nickname, self._my_version, self._oldest_supported)
+ self._nickname_utf8, self._my_version, self._oldest_supported)
self._published_announcements.add(ann)
self._maybe_publish()
- def subscribe_to(self, service_name):
+ def subscribe_to(self, service_name, cb, *args, **kwargs):
+ self._local_subscribers.append( (service_name,cb,args,kwargs) )
self._subscribed_service_names.add(service_name)
self._maybe_subscribe()
+ for (servicename,nodeid),ann_d in self._current_announcements.items():
+ if servicename == service_name:
+ eventually(cb, nodeid, ann_d)
def _maybe_subscribe(self):
if not self._publisher:
# duplicate requests.
self._subscriptions.add(service_name)
d = self._publisher.callRemote("subscribe", self, service_name)
- d.addErrback(log.err, facility="tahoe.introducer",
+ d.addErrback(trap_deadref)
+ d.addErrback(log.err, format="server errored during subscribe",
+ facility="tahoe.introducer",
level=log.WEIRD, umid="2uMScQ")
def _maybe_publish(self):
return
# this re-publishes everything. The Introducer ignores duplicates
for ann in self._published_announcements:
+ self._debug_counts["outbound_message"] += 1
d = self._publisher.callRemote("publish", ann)
- d.addErrback(log.err, facility="tahoe.introducer",
+ d.addErrback(trap_deadref)
+ d.addErrback(log.err,
+ format="server errored during publish %(ann)s",
+ ann=ann, facility="tahoe.introducer",
level=log.WEIRD, umid="xs9pVQ")
def remote_announce(self, announcements):
+ self.log("received %d announcements" % len(announcements))
+ self._debug_counts["inbound_message"] += 1
for ann in announcements:
- self.log("received %d announcements" % len(announcements))
- (furl, service_name, ri_name, nickname, ver, oldest) = ann
- if service_name not in self._subscribed_service_names:
- self.log("announcement for a service we don't care about [%s]"
- % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
- continue
- if ann in self._received_announcements:
- self.log("ignoring old announcement: %s" % (ann,),
- level=log.NOISY)
- continue
- self.log("new announcement[%s]: %s" % (service_name, ann))
- self._received_announcements.add(ann)
- self._new_announcement(ann)
-
- def _new_announcement(self, announcement):
- # this will only be called for new announcements
- index = make_index(announcement)
- if index in self._connectors:
- self.log("replacing earlier announcement", level=log.NOISY)
- self._connectors[index].stopConnecting()
- rsc = RemoteServiceConnector(announcement, self._tub, self)
- self._connectors[index] = rsc
- rsc.startConnecting()
-
- def add_connection(self, nodeid, service_name, rref):
- self._connections.add( (nodeid, service_name, rref) )
- self.counter += 1
- # when one connection is established, reset the timers on all others,
- # to trigger a reconnection attempt in one second. This is intended
- # to accelerate server connections when we've been offline for a
- # while. The goal is to avoid hanging out for a long time with
- # connections to only a subset of the servers, which would increase
- # the chances that we'll put shares in weird places (and not update
- # existing shares of mutable files). See #374 for more details.
- for rsc in self._connectors.values():
- rsc.reset()
-
- def remove_connection(self, nodeid, service_name, rref):
- self._connections.discard( (nodeid, service_name, rref) )
- self.counter += 1
-
-
- def get_all_connections(self):
- return frozenset(self._connections)
-
- def get_all_connectors(self):
- return self._connectors.copy()
-
- def get_all_peerids(self):
- return frozenset([peerid
- for (peerid, service_name, rref)
- in self._connections])
-
- def get_nickname_for_peerid(self, peerid):
- for k in self._connectors:
- (peerid0, svcname0) = k
- if peerid0 == peerid:
- rsc = self._connectors[k]
- return rsc.nickname
- return None
-
- def get_all_connections_for(self, service_name):
- return frozenset([c
- for c in self._connections
- if c[1] == service_name])
-
- def get_peers(self, service_name):
- """Return a set of (peerid, versioned-rref) tuples."""
- return frozenset([(peerid, r) for (peerid, servname, r) in self._connections if servname == service_name])
-
- def get_permuted_peers(self, service_name, key):
- """Return an ordered list of (peerid, versioned-rref) tuples."""
-
- servers = self.get_peers(service_name)
-
- return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
+ try:
+ self._process_announcement(ann)
+ except:
+ log.err(format="unable to process announcement %(ann)s",
+ ann=ann)
+ # Don't let a corrupt announcement prevent us from processing
+ # the remaining ones. Don't return an error to the server,
+ # since they'd just ignore it anyways.
+ pass
+
+ def _process_announcement(self, ann):
+ self._debug_counts["inbound_announcement"] += 1
+ (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
+ if service_name not in self._subscribed_service_names:
+ self.log("announcement for a service we don't care about [%s]"
+ % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
+ self._debug_counts["wrong_service"] += 1
+ return
+ self.log("announcement for [%s]: %s" % (service_name, ann),
+ umid="BoKEag")
+ assert type(furl) is str
+ assert type(service_name) is str
+ assert type(ri_name) is str
+ assert type(nickname_utf8) is str
+ nickname = nickname_utf8.decode("utf-8")
+ assert type(nickname) is unicode
+ assert type(ver) is str
+ assert type(oldest) is str
+
+ nodeid = b32decode(SturdyRef(furl).tubID.upper())
+ nodeid_s = idlib.shortnodeid_b2a(nodeid)
+
+ ann_d = { "version": 0,
+ "service-name": service_name,
+
+ "FURL": furl,
+ "nickname": nickname,
+ "app-versions": {}, # need #466 and v2 introducer
+ "my-version": ver,
+ "oldest-supported": oldest,
+ }
+
+ index = (service_name, nodeid)
+ if self._current_announcements.get(index, None) == ann_d:
+ self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
+ service=service_name, nodeid=nodeid_s,
+ level=log.UNUSUAL, umid="B1MIdA")
+ self._debug_counts["duplicate_announcement"] += 1
+ return
+ if index in self._current_announcements:
+ self._debug_counts["update"] += 1
+ else:
+ self._debug_counts["new_announcement"] += 1
+
+ self._current_announcements[index] = ann_d
+ # note: we never forget an index, but we might update its value
+
+ for (service_name2,cb,args,kwargs) in self._local_subscribers:
+ if service_name2 == service_name:
+ eventually(cb, nodeid, ann_d, *args, **kwargs)
def remote_set_encoding_parameters(self, parameters):
self.encoding_parameters = parameters
def connected_to_introducer(self):
- return self._connected
-
- def debug_disconnect_from_peerid(self, victim_nodeid):
- # for unit tests: locate and sever all connections to the given
- # peerid.
- for (nodeid, service_name, rref) in self._connections:
- if nodeid == victim_nodeid:
- rref.tracker.broker.transport.loseConnection()
+ return bool(self._publisher)
+++ /dev/null
-
-import re
-from base64 import b32decode
-
-def make_index(announcement):
- (furl, service_name, ri_name, nickname, ver, oldest) = announcement
- m = re.match(r'pb://(\w+)@', furl)
- assert m
- nodeid = b32decode(m.group(1).upper())
- return (nodeid, service_name)
-
parameter: this is supposed to be a globally-unique string that
identifies the RemoteInterface that is implemented."""
- def subscribe_to(service_name):
+ def subscribe_to(service_name, callback, *args, **kwargs):
"""Call this if you will eventually want to use services with the
given SERVICE_NAME. This will prompt me to subscribe to announcements
- of those services. You can pick up the announcements later by calling
- get_all_connections_for() or get_permuted_peers().
- """
-
- def get_all_connections():
- """Return a frozenset of (nodeid, service_name, rref) tuples, one for
- each active connection we've established to a remote service. This is
- mostly useful for unit tests that need to wait until a certain number
- of connections have been made."""
-
- def get_all_connectors():
- """Return a dict that maps from (nodeid, service_name) to a
- RemoteServiceConnector instance for all services that we are actively
- trying to connect to. Each RemoteServiceConnector has the following
- public attributes::
-
- service_name: the type of service provided, like 'storage'
- announcement_time: when we first heard about this service
- last_connect_time: when we last established a connection
- last_loss_time: when we last lost a connection
-
- version: the peer's version, from the most recent connection
- oldest_supported: the peer's oldest supported version, same
-
- rref: the RemoteReference, if connected, otherwise None
- remote_host: the IAddress, if connected, otherwise None
-
- This method is intended for monitoring interfaces, such as a web page
- which describes connecting and connected peers.
- """
-
- def get_all_peerids():
- """Return a frozenset of all peerids to whom we have a connection (to
- one or more services) established. Mostly useful for unit tests."""
-
- def get_all_connections_for(service_name):
- """Return a frozenset of (nodeid, service_name, rref) tuples, one
- for each active connection that provides the given SERVICE_NAME."""
-
- def get_permuted_peers(service_name, key):
- """Returns an ordered list of (peerid, rref) tuples, selecting from
- the connections that provide SERVICE_NAME, using a hash-based
- permutation keyed by KEY. This randomizes the service list in a
- repeatable way, to distribute load over many peers.
+ of those services. Your callback will be invoked with at least two
+ arguments: a serverid (binary string), and an announcement
+ dictionary, followed by any additional callback args/kwargs you give
+ me. I will run your callback for both new announcements and for
+ announcements that have changed, but you must be prepared to tolerate
+ duplicates.
+
+ The announcement dictionary that I give you will have the following
+ keys:
+
+ version: 0
+ service-name: str('storage')
+
+ FURL: str(furl)
+ remoteinterface-name: str(ri_name)
+ nickname: unicode
+ app-versions: {}
+ my-version: str
+ oldest-supported: str
+
+ Note that app-version will be an empty dictionary until #466 is done
+ and both the introducer and the remote client have been upgraded. For
+ current (native) server types, the serverid will always be equal to
+ the binary form of the FURL's tubid.
"""
def connected_to_introducer():
from allmydata.util import log, idlib
from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
IIntroducerClient, RIIntroducerPublisherAndSubscriberService
-from allmydata.introducer.common import make_index
+
+def make_index(announcement):
+ (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+ m = re.match(r'pb://(\w+)@', furl)
+ assert m
+ nodeid = b32decode(m.group(1).upper())
+ return (nodeid, service_name)
class RemoteServiceConnector:
"""I hold information about a peer service that we want to connect to. If
import time, os.path
+from base64 import b32decode
from zope.interface import implements
from twisted.application import service
-from foolscap.api import Referenceable
+from foolscap.api import Referenceable, SturdyRef
import allmydata
from allmydata import node
-from allmydata.util import log
+from allmydata.util import log, rrefutil
from allmydata.introducer.interfaces import \
RIIntroducerPublisherAndSubscriberService
-from allmydata.introducer.common import make_index
class IntroducerNode(node.Node):
PORTNUMFILE = "introducer.port"
def __init__(self, basedir="."):
service.MultiService.__init__(self)
self.introducer_url = None
- # 'index' is (tubid, service_name)
+ # 'index' is (service_name, tubid)
self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # dict of (rref->timestamp) dicts
+ self._debug_counts = {"inbound_message": 0,
+ "inbound_duplicate": 0,
+ "inbound_update": 0,
+ "outbound_message": 0,
+ "outbound_announcements": 0,
+ "inbound_subscribe": 0}
def log(self, *args, **kwargs):
if "facility" not in kwargs:
return self.VERSION
def remote_publish(self, announcement):
+ try:
+ self._publish(announcement)
+ except:
+ log.err(format="Introducer.remote_publish failed on %(ann)s",
+ ann=announcement, level=log.UNUSUAL, umid="620rWA")
+ raise
+
+ def _publish(self, announcement):
+ self._debug_counts["inbound_message"] += 1
self.log("introducer: announcement published: %s" % (announcement,) )
- index = make_index(announcement)
+ (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
+
+ nodeid = b32decode(SturdyRef(furl).tubID.upper())
+ index = (service_name, nodeid)
+
if index in self._announcements:
(old_announcement, timestamp) = self._announcements[index]
if old_announcement == announcement:
self.log("but we already knew it, ignoring", level=log.NOISY)
+ self._debug_counts["inbound_duplicate"] += 1
return
else:
self.log("old announcement being updated", level=log.NOISY)
+ self._debug_counts["inbound_update"] += 1
self._announcements[index] = (announcement, time.time())
- (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+
for s in self._subscribers.get(service_name, []):
- s.callRemote("announce", set([announcement]))
+ self._debug_counts["outbound_message"] += 1
+ self._debug_counts["outbound_announcements"] += 1
+ d = s.callRemote("announce", set([announcement]))
+ d.addErrback(rrefutil.trap_deadref)
+ d.addErrback(log.err,
+ format="subscriber errored on announcement %(ann)s",
+ ann=announcement, facility="tahoe.introducer",
+ level=log.UNUSUAL, umid="jfGMXQ")
def remote_subscribe(self, subscriber, service_name):
self.log("introducer: subscription[%s] request at %s" % (service_name,
subscriber))
+ self._debug_counts["inbound_subscribe"] += 1
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove)
- announcements = set( [ ann
- for idx,(ann,when) in self._announcements.items()
- if idx[1] == service_name] )
- d = subscriber.callRemote("announce", announcements)
- d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
-
-
+ announcements = set(
+ [ ann
+ for (sn2,nodeid),(ann,when) in self._announcements.items()
+ if sn2 == service_name] )
+ self._debug_counts["outbound_message"] += 1
+ self._debug_counts["outbound_announcements"] += len(announcements)
+ d = subscriber.callRemote("announce", announcements)
+ d.addErrback(rrefutil.trap_deadref)
+ d.addErrback(log.err,
+ format="subscriber errored during subscribe %(anns)s",
+ anns=announcements, facility="tahoe.introducer",
+ level=log.UNUSUAL, umid="mtZepQ")
nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
self.nickname = nickname_utf8.decode("utf-8")
+ assert type(self.nickname) is unicode
self.init_tempdir()
self.create_tub()
# roadmap:
#
-# implement ServerFarm, change Client to create it, change
-# uploader/servermap to get rrefs from it. ServerFarm calls
-# IntroducerClient.subscribe_to .
+# 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
+# create it, change uploader/servermap to get rrefs from it. ServerFarm calls
+# IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
+# to clients. webapi status pages call broker.get_info_about_serverid.
#
-# implement NativeStorageClient, change Tahoe2PeerSelector to use it. All
-# NativeStorageClients come from the introducer
+# 2: move get_info methods to the descriptor, webapi status pages call
+# broker.get_descriptor_for_serverid().get_info
#
-# change web/check_results.py to get NativeStorageClients from check results,
-# ask it for a nickname (instead of using client.get_nickname_for_serverid)
+# 3?later?: store descriptors in UploadResults/etc instead of serverids,
+# webapi status pages call descriptor.get_info and don't use storage_broker
+# or Client
#
-# implement tahoe.cfg scanner, create static NativeStorageClients
+# 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
+# optional. This closes #467
+#
+# 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
+# clients. Clients stop doing callRemote(), use NativeStorageClient methods
+# instead (which might do something else, i.e. http or whatever). The
+# introducer and tahoe.cfg only create NativeStorageClients for now.
+#
+# 6: implement other sorts of IStorageClient classes: S3, etc
-import sha
-from zope.interface import implements
+import sha, time
+from zope.interface import implements, Interface
+from foolscap.api import eventually
from allmydata.interfaces import IStorageBroker
+from allmydata.util import idlib, log
+from allmydata.util.rrefutil import add_version_to_remote_reference
+
+# who is responsible for de-duplication?
+# both?
+# IC remembers the unpacked announcements it receives, to provide for late
+# subscribers and to remove duplicates
+
+# if a client subscribes after startup, will they receive old announcements?
+# yes
+
+# who will be responsible for signature checking?
+# make it be IntroducerClient, so they can push the filter outwards and
+# reduce inbound network traffic
+
+# what should the interface between StorageFarmBroker and IntroducerClient
+# look like?
+# don't pass signatures: only pass validated blessed-objects
class StorageFarmBroker:
implements(IStorageBroker)
I'm also responsible for subscribing to the IntroducerClient to find out
about new servers as they are announced by the Introducer.
"""
- def __init__(self, permute_peers=True):
+ def __init__(self, tub, permute_peers):
+ self.tub = tub
assert permute_peers # False not implemented yet
- self.servers = {} # serverid -> StorageClient instance
self.permute_peers = permute_peers
+ # self.descriptors maps serverid -> IServerDescriptor, and keeps
+ # track of all the storage servers that we've heard about. Each
+ # descriptor manages its own Reconnector, and will give us a
+ # RemoteReference when we ask them for it.
+ self.descriptors = {}
+ # self.servers are statically configured from unit tests
+ self.test_servers = {} # serverid -> rref
self.introducer_client = None
- def add_server(self, serverid, s):
- self.servers[serverid] = s
+
+ # these two are used in unit tests
+ def test_add_server(self, serverid, rref):
+ self.test_servers[serverid] = rref
+ def test_add_descriptor(self, serverid, dsc):
+ self.descriptors[serverid] = dsc
+
def use_introducer(self, introducer_client):
self.introducer_client = ic = introducer_client
- ic.subscribe_to("storage")
+ ic.subscribe_to("storage", self._got_announcement)
+
+ def _got_announcement(self, serverid, ann_d):
+ assert ann_d["service-name"] == "storage"
+ old = self.descriptors.get(serverid)
+ if old:
+ if old.get_announcement() == ann_d:
+ return # duplicate
+ # replacement
+ del self.descriptors[serverid]
+ old.stop_connecting()
+ # now we forget about them and start using the new one
+ dsc = NativeStorageClientDescriptor(serverid, ann_d)
+ self.descriptors[serverid] = dsc
+ dsc.start_connecting(self.tub, self._trigger_connections)
+ # the descriptor will manage their own Reconnector, and each time we
+ # need servers, we'll ask them if they're connected or not.
+
+ def _trigger_connections(self):
+ # when one connection is established, reset the timers on all others,
+ # to trigger a reconnection attempt in one second. This is intended
+ # to accelerate server connections when we've been offline for a
+ # while. The goal is to avoid hanging out for a long time with
+ # connections to only a subset of the servers, which would increase
+ # the chances that we'll put shares in weird places (and not update
+ # existing shares of mutable files). See #374 for more details.
+ for dsc in self.descriptors.values():
+ dsc.try_to_connect()
+
+
def get_servers_for_index(self, peer_selection_index):
# first cut: return a list of (peerid, versioned-rref) tuples
def get_all_servers(self):
# return a frozenset of (peerid, versioned-rref) tuples
servers = {}
- for serverid,server in self.servers.items():
- servers[serverid] = server
- if self.introducer_client:
- ic = self.introducer_client
- for serverid,server in ic.get_peers("storage"):
- servers[serverid] = server
+ for serverid,rref in self.test_servers.items():
+ servers[serverid] = rref
+ for serverid,dsc in self.descriptors.items():
+ rref = dsc.get_rref()
+ if rref:
+ servers[serverid] = rref
return frozenset(servers.items())
def get_all_serverids(self):
- for serverid in self.servers:
- yield serverid
- if self.introducer_client:
- for serverid,server in self.introducer_client.get_peers("storage"):
- yield serverid
+ serverids = set()
+ serverids.update(self.test_servers.keys())
+ serverids.update(self.descriptors.keys())
+ return frozenset(serverids)
+
+ def get_all_descriptors(self):
+ return sorted(self.descriptors.values(),
+ key=lambda dsc: dsc.get_serverid())
def get_nickname_for_serverid(self, serverid):
- if serverid in self.servers:
- return self.servers[serverid].nickname
- if self.introducer_client:
- return self.introducer_client.get_nickname_for_peerid(serverid)
+ if serverid in self.descriptors:
+ return self.descriptors[serverid].get_nickname()
return None
-class NativeStorageClient:
- def __init__(self, serverid, furl, nickname, min_shares=1):
+
+class IServerDescriptor(Interface):
+ def start_connecting(tub, trigger_cb):
+ pass
+ def get_nickname():
+ pass
+ def get_rref():
+ pass
+
+class NativeStorageClientDescriptor:
+ """I hold information about a storage server that we want to connect to.
+ If we are connected, I hold the RemoteReference, their host address, and
+ the their version information. I remember information about when we were
+ last connected too, even if we aren't currently connected.
+
+ @ivar announcement_time: when we first heard about this service
+ @ivar last_connect_time: when we last established a connection
+ @ivar last_loss_time: when we last lost a connection
+
+ @ivar version: the server's versiondict, from the most recent announcement
+ @ivar nickname: the server's self-reported nickname (unicode), same
+
+ @ivar rref: the RemoteReference, if connected, otherwise None
+ @ivar remote_host: the IAddress, if connected, otherwise None
+ """
+ implements(IServerDescriptor)
+
+ VERSION_DEFAULTS = {
+ "http://allmydata.org/tahoe/protocols/storage/v1" :
+ { "maximum-immutable-share-size": 2**32,
+ "tolerates-immutable-read-overrun": False,
+ "delete-mutable-shares-with-zero-length-writev": False,
+ },
+ "application-version": "unknown: no get_version()",
+ }
+
+ def __init__(self, serverid, ann_d, min_shares=1):
self.serverid = serverid
- self.furl = furl
- self.nickname = nickname
+ self.announcement = ann_d
self.min_shares = min_shares
+ self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
+ self.announcement_time = time.time()
+ self.last_connect_time = None
+ self.last_loss_time = None
+ self.remote_host = None
+ self.rref = None
+ self._reconnector = None
+ self._trigger_cb = None
+
+ def get_serverid(self):
+ return self.serverid
+
+ def get_nickname(self):
+ return self.announcement["nickname"].decode("utf-8")
+ def get_announcement(self):
+ return self.announcement
+ def get_remote_host(self):
+ return self.remote_host
+ def get_last_connect_time(self):
+ return self.last_connect_time
+ def get_last_loss_time(self):
+ return self.last_loss_time
+ def get_announcement_time(self):
+ return self.announcement_time
+
+ def start_connecting(self, tub, trigger_cb):
+ furl = self.announcement["FURL"]
+ self._trigger_cb = trigger_cb
+ self._reconnector = tub.connectTo(furl, self._got_connection)
+
+ def _got_connection(self, rref):
+ lp = log.msg(format="got connection to %(serverid)s, getting versions",
+ serverid=self.serverid_s,
+ facility="tahoe.storage_broker", umid="coUECQ")
+ if self._trigger_cb:
+ eventually(self._trigger_cb)
+ default = self.VERSION_DEFAULTS
+ d = add_version_to_remote_reference(rref, default)
+ d.addCallback(self._got_versioned_service, lp)
+ d.addErrback(log.err, format="storageclient._got_connection",
+ serverid=self.serverid_s, umid="Sdq3pg")
+
+ def _got_versioned_service(self, rref, lp):
+ log.msg(format="%(serverid)s provided version info %(version)s",
+ serverid=self.serverid_s, version=rref.version,
+ facility="tahoe.storage_broker", umid="SWmJYg",
+ level=log.NOISY, parent=lp)
+
+ self.last_connect_time = time.time()
+ self.remote_host = rref.getPeer()
+ self.rref = rref
+ rref.notifyOnDisconnect(self._lost)
+
+ def get_rref(self):
+ return self.rref
+
+ def _lost(self):
+ log.msg(format="lost connection to %(serverid)s",
+ serverid=self.serverid_s,
+ facility="tahoe.storage_broker", umid="zbRllw")
+ self.last_loss_time = time.time()
+ self.rref = None
+ self.remote_host = None
+
+ def stop_connecting(self):
+ # used when this descriptor has been superceded by another
+ self._reconnector.stopConnecting()
+
+ def try_to_connect(self):
+ # used when the broker wants us to hurry up
+ self._reconnector.reset()
+
class UnknownServerTypeError(Exception):
pass
def _check_connections(self):
for c in self.clients:
- ic = c.introducer_client
- if not ic.connected_to_introducer():
+ if not c.connected_to_introducer():
return False
- if len(ic.get_all_peerids()) != self.numclients:
+ sb = c.get_storage_broker()
+ if len(sb.get_all_servers()) != self.numclients:
return False
return True
from allmydata.client import Client
from allmydata.storage.server import StorageServer, storage_index_to_dir
from allmydata.util import fileutil, idlib, hashutil
-from allmydata.introducer.client import RemoteServiceConnector
from allmydata.test.common_web import HTTPClientGETFactory
from allmydata.interfaces import IStorageBroker
def dontNotifyOnDisconnect(self, marker):
del self.disconnectors[marker]
-def wrap(original, service_name):
+def wrap_storage_server(original):
# Much of the upload/download code uses rref.version (which normally
# comes from rrefutil.add_version_to_remote_reference). To avoid using a
# network, we want a LocalWrapper here. Try to satisfy all these
# constraints at the same time.
wrapper = LocalWrapper(original)
- try:
- version = original.remote_get_version()
- except AttributeError:
- version = RemoteServiceConnector.VERSION_DEFAULTS[service_name]
- wrapper.version = version
+ wrapper.version = original.remote_get_version()
return wrapper
class NoNetworkStorageBroker:
ss.setServiceParent(middleman)
serverid = ss.my_nodeid
self.servers_by_number[i] = ss
- self.servers_by_id[serverid] = wrap(ss, "storage")
+ self.servers_by_id[serverid] = wrap_storage_server(ss)
self.all_servers = frozenset(self.servers_by_id.items())
for c in self.clients:
c._servers = self.all_servers
from twisted.trial import unittest
from allmydata import check_results, uri
from allmydata.web import check_results as web_check_results
-from allmydata.storage_client import StorageFarmBroker, NativeStorageClient
+from allmydata.storage_client import StorageFarmBroker, NativeStorageClientDescriptor
from common_web import WebRenderingMixin
class FakeClient:
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
def create_fake_client(self):
- sb = StorageFarmBroker()
+ sb = StorageFarmBroker(None, True)
for (peerid, nickname) in [("\x00"*20, "peer-0"),
("\xff"*20, "peer-f"),
("\x11"*20, "peer-11")] :
- n = NativeStorageClient(peerid, None, nickname)
- sb.add_server(peerid, n)
+ ann_d = { "version": 0,
+ "service-name": "storage",
+ "FURL": "fake furl",
+ "nickname": unicode(nickname),
+ "app-versions": {}, # need #466 and v2 introducer
+ "my-version": "ver",
+ "oldest-supported": "oldest",
+ }
+ dsc = NativeStorageClientDescriptor(peerid, ann_d)
+ sb.test_add_descriptor(peerid, dsc)
c = FakeClient()
c.storage_broker = sb
return c
for (peerid,rref) in sb.get_servers_for_index(key) ]
def test_permute(self):
- sb = StorageFarmBroker()
+ sb = StorageFarmBroker(None, True)
for k in ["%d" % i for i in range(5)]:
- sb.add_server(k, None)
+ sb.test_add_server(k, None)
self.failUnlessEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
self.failUnlessEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
- sb.servers = {}
+ sb.test_servers.clear()
self.failUnlessEqual(self._permute(sb, "one"), [])
def test_versions(self):
"max_segment_size": 1*MiB,
}
stats_provider = None
- storage_broker = StorageFarmBroker()
+ storage_broker = StorageFarmBroker(None, True)
def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)
def get_encoding_parameters(self):
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.client import IntroducerClient
from allmydata.introducer.server import IntroducerService
-from allmydata.introducer.common import make_index
# test compatibility with old introducer .tac files
from allmydata.introducer import IntroducerNode
from allmydata.introducer import old
-from allmydata.util import idlib, pollmixin
+from allmydata.util import pollmixin
import common_util as testutil
-class FakeNode(Referenceable):
- pass
-
class LoggingMultiService(service.MultiService):
def log(self, msg, **kw):
log.msg(msg, **kw)
class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
def test_create(self):
- ic = IntroducerClient(None, "introducer.furl", "my_nickname",
+ ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
"my_version", "oldest_version")
def test_listen(self):
class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
- def setUp(self):
- ServiceMixin.setUp(self)
- self.central_tub = tub = Tub()
+ def create_tub(self, portnum=0):
+ tubfile = os.path.join(self.basedir, "tub.pem")
+ self.central_tub = tub = Tub(certFile=tubfile)
#tub.setOption("logLocalFailures", True)
#tub.setOption("logRemoteFailures", True)
tub.setOption("expose-remote-exception-types", False)
tub.setServiceParent(self.parent)
- l = tub.listenOn("tcp:0")
- portnum = l.getPortnum()
- tub.setLocation("localhost:%d" % portnum)
+ l = tub.listenOn("tcp:%d" % portnum)
+ self.central_portnum = l.getPortnum()
+ if portnum != 0:
+ assert self.central_portnum == portnum
+ tub.setLocation("localhost:%d" % self.central_portnum)
class SystemTest(SystemTestMixin, unittest.TestCase):
def test_system(self):
- i = IntroducerService()
- i.setServiceParent(self.parent)
- self.introducer_furl = self.central_tub.registerReference(i)
- return self.do_system_test()
+ self.basedir = "introducer/SystemTest/system"
+ os.makedirs(self.basedir)
+ return self.do_system_test(IntroducerService)
test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
- def test_system_oldserver(self):
- i = old.IntroducerService_V1()
- i.setServiceParent(self.parent)
- self.introducer_furl = self.central_tub.registerReference(i)
- return self.do_system_test()
-
- def do_system_test(self):
+ def do_system_test(self, create_introducer):
+ self.create_tub()
+ introducer = create_introducer()
+ introducer.setServiceParent(self.parent)
+ iff = os.path.join(self.basedir, "introducer.furl")
+ tub = self.central_tub
+ ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
+ self.introducer_furl = ifurl
NUMCLIENTS = 5
# we have 5 clients who publish themselves, and an extra one does
clients = []
tubs = {}
+ received_announcements = {}
+ NUM_SERVERS = NUMCLIENTS
+ subscribing_clients = []
+ publishing_clients = []
+
for i in range(NUMCLIENTS+1):
tub = Tub()
#tub.setOption("logLocalFailures", True)
portnum = l.getPortnum()
tub.setLocation("localhost:%d" % portnum)
- n = FakeNode()
log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
- client_class = IntroducerClient
- if i == 0:
- client_class = old.IntroducerClient_V1
- c = client_class(tub, self.introducer_furl,
- "nickname-%d" % i, "version", "oldest")
+ c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i,
+ "version", "oldest")
+ received_announcements[c] = ra = {}
+ def got(serverid, ann_d, announcements):
+ announcements[serverid] = ann_d
+ c.subscribe_to("storage", got, received_announcements[c])
+ subscribing_clients.append(c)
+
if i < NUMCLIENTS:
- node_furl = tub.registerReference(n)
+ node_furl = tub.registerReference(Referenceable())
c.publish(node_furl, "storage", "ri_name")
+ publishing_clients.append(c)
# the last one does not publish anything
- c.subscribe_to("storage")
-
c.setServiceParent(self.parent)
clients.append(c)
tubs[c] = tub
def _wait_for_all_connections():
- for c in clients:
- if len(c.get_all_connections()) < NUMCLIENTS:
+ for c in subscribing_clients:
+ if len(received_announcements[c]) < NUM_SERVERS:
return False
return True
d = self.poll(_wait_for_all_connections)
def _check1(res):
log.msg("doing _check1")
+ dc = introducer._debug_counts
+ self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS)
+ self.failUnlessEqual(dc["inbound_duplicate"], 0)
+ self.failUnlessEqual(dc["inbound_update"], 0)
+ self.failUnless(dc["outbound_message"])
+
for c in clients:
self.failUnless(c.connected_to_introducer())
- self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
- self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
- self.failUnlessEqual(len(c.get_all_connections_for("storage")),
- NUMCLIENTS)
+ for c in subscribing_clients:
+ cdc = c._debug_counts
+ self.failUnless(cdc["inbound_message"])
+ self.failUnlessEqual(cdc["inbound_announcement"],
+ NUM_SERVERS)
+ self.failUnlessEqual(cdc["wrong_service"], 0)
+ self.failUnlessEqual(cdc["duplicate_announcement"], 0)
+ self.failUnlessEqual(cdc["update"], 0)
+ self.failUnlessEqual(cdc["new_announcement"],
+ NUM_SERVERS)
+ anns = received_announcements[c]
+ self.failUnlessEqual(len(anns), NUM_SERVERS)
+
nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
- self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
- "nickname-0")
+ ann_d = anns[nodeid0]
+ nick = ann_d["nickname"]
+ self.failUnlessEqual(type(nick), unicode)
+ self.failUnlessEqual(nick, u"nickname-0")
+ for c in publishing_clients:
+ cdc = c._debug_counts
+ self.failUnlessEqual(cdc["outbound_message"], 1)
d.addCallback(_check1)
- origin_c = clients[0]
- def _disconnect_somebody_else(res):
- # now disconnect somebody's connection to someone else
- current_counter = origin_c.counter
- victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
- log.msg(" disconnecting %s->%s" %
- (tubs[origin_c].tubID,
- idlib.shortnodeid_b2a(victim_nodeid)))
- origin_c.debug_disconnect_from_peerid(victim_nodeid)
- log.msg(" did disconnect")
-
- # then wait until something changes, which ought to be them
- # noticing the loss
- def _compare():
- return current_counter != origin_c.counter
- return self.poll(_compare)
-
- d.addCallback(_disconnect_somebody_else)
-
- # and wait for them to reconnect
- d.addCallback(lambda res: self.poll(_wait_for_all_connections))
+ # force an introducer reconnect, by shutting down the Tub it's using
+ # and starting a new Tub (with the old introducer). Everybody should
+ # reconnect and republish, but the introducer should ignore the
+ # republishes as duplicates. However, because the server doesn't know
+ # what each client does and does not know, it will send them a copy
+ # of the current announcement table anyway.
+
+ d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
+ d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
+
+ def _wait_for_introducer_loss():
+ for c in clients:
+ if c.connected_to_introducer():
+ return False
+ return True
+ d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
+
+ def _restart_introducer_tub(_ign):
+ log.msg("restarting introducer's Tub")
+
+ # note: old.Server doesn't have this count
+ dc = introducer._debug_counts
+ self.expected_count = dc["inbound_message"] + NUM_SERVERS
+ self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1
+ introducer._debug0 = dc["outbound_message"]
+ for c in subscribing_clients:
+ cdc = c._debug_counts
+ c._debug0 = cdc["inbound_message"]
+
+ self.create_tub(self.central_portnum)
+ newfurl = self.central_tub.registerReference(introducer,
+ furlFile=iff)
+ assert newfurl == self.introducer_furl
+ d.addCallback(_restart_introducer_tub)
+
+ def _wait_for_introducer_reconnect():
+ # wait until:
+ # all clients are connected
+ # the introducer has received publish messages from all of them
+ # the introducer has received subscribe messages from all of them
+ # the introducer has sent (duplicate) announcements to all of them
+ # all clients have received (duplicate) announcements
+ dc = introducer._debug_counts
+ for c in clients:
+ if not c.connected_to_introducer():
+ return False
+ if dc["inbound_message"] < self.expected_count:
+ return False
+ if dc["inbound_subscribe"] < self.expected_subscribe_count:
+ return False
+ for c in subscribing_clients:
+ cdc = c._debug_counts
+ if cdc["inbound_message"] < c._debug0+1:
+ return False
+ return True
+ d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
+
def _check2(res):
log.msg("doing _check2")
+ # assert that the introducer sent out new messages, one per
+ # subscriber
+ dc = introducer._debug_counts
+ self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS)
+ self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS)
+ self.failUnlessEqual(dc["inbound_update"], 0)
+ self.failUnlessEqual(dc["outbound_message"],
+ introducer._debug0 + len(subscribing_clients))
for c in clients:
- self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
+ self.failUnless(c.connected_to_introducer())
+ for c in subscribing_clients:
+ cdc = c._debug_counts
+ self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS)
d.addCallback(_check2)
- def _disconnect_yourself(res):
- # now disconnect somebody's connection to themselves.
- current_counter = origin_c.counter
- victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
- log.msg(" disconnecting %s->%s" %
- (tubs[origin_c].tubID,
- idlib.shortnodeid_b2a(victim_nodeid)))
- origin_c.debug_disconnect_from_peerid(victim_nodeid)
- log.msg(" did disconnect from self")
-
- def _compare():
- return current_counter != origin_c.counter
- return self.poll(_compare)
- d.addCallback(_disconnect_yourself)
-
- d.addCallback(lambda res: self.poll(_wait_for_all_connections))
- def _check3(res):
- log.msg("doing _check3")
- for c in clients:
- self.failUnlessEqual(len(c.get_all_connections_for("storage")),
- NUMCLIENTS)
- d.addCallback(_check3)
- def _shutdown_introducer(res):
- # now shut down the introducer. We do this by shutting down the
- # tub it's using. Nobody's connections (to each other) should go
- # down. All clients should notice the loss, and no other errors
- # should occur.
- log.msg("shutting down the introducer")
- return self.central_tub.disownServiceParent()
- d.addCallback(_shutdown_introducer)
- def _wait_for_introducer_loss():
+ # Then force an introducer restart, by shutting down the Tub,
+ # destroying the old introducer, and starting a new Tub+Introducer.
+ # Everybody should reconnect and republish, and the (new) introducer
+ # will distribute the new announcements, but the clients should
+ # ignore the republishes as duplicates.
+
+ d.addCallback(lambda _ign: log.msg("shutting down introducer"))
+ d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
+ d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
+
+ def _restart_introducer(_ign):
+ log.msg("restarting introducer")
+ self.create_tub(self.central_portnum)
+
+ for c in subscribing_clients:
+ # record some counters for later comparison. Stash the values
+ # on the client itself, because I'm lazy.
+ cdc = c._debug_counts
+ c._debug1 = cdc["inbound_announcement"]
+ c._debug2 = cdc["inbound_message"]
+ c._debug3 = cdc["new_announcement"]
+ newintroducer = create_introducer()
+ self.expected_message_count = NUM_SERVERS
+ self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients)
+ self.expected_subscribe_count = len(subscribing_clients)
+ newfurl = self.central_tub.registerReference(newintroducer,
+ furlFile=iff)
+ assert newfurl == self.introducer_furl
+ d.addCallback(_restart_introducer)
+ def _wait_for_introducer_reconnect2():
+ # wait until:
+ # all clients are connected
+ # the introducer has received publish messages from all of them
+ # the introducer has received subscribe messages from all of them
+ # the introducer has sent announcements for everybody to everybody
+ # all clients have received all the (duplicate) announcements
+ # at that point, the system should be quiescent
+ dc = introducer._debug_counts
for c in clients:
- if c.connected_to_introducer():
+ if not c.connected_to_introducer():
+ return False
+ if dc["inbound_message"] < self.expected_message_count:
+ return False
+ if dc["outbound_announcements"] < self.expected_announcement_count:
+ return False
+ if dc["inbound_subscribe"] < self.expected_subscribe_count:
+ return False
+ for c in subscribing_clients:
+ cdc = c._debug_counts
+ if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
return False
return True
- d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
+ d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
- def _check4(res):
- log.msg("doing _check4")
+ def _check3(res):
+ log.msg("doing _check3")
for c in clients:
- self.failUnlessEqual(len(c.get_all_connections_for("storage")),
- NUMCLIENTS)
- self.failIf(c.connected_to_introducer())
- d.addCallback(_check4)
+ self.failUnless(c.connected_to_introducer())
+ for c in subscribing_clients:
+ cdc = c._debug_counts
+ self.failUnless(cdc["inbound_announcement"] > c._debug1)
+ self.failUnless(cdc["inbound_message"] > c._debug2)
+ # there should have been no new announcements
+ self.failUnlessEqual(cdc["new_announcement"], c._debug3)
+ # and the right number of duplicate ones. There were
+ # NUM_SERVERS from the servertub restart, and there should be
+ # another NUM_SERVERS now
+ self.failUnlessEqual(cdc["duplicate_announcement"],
+ 2*NUM_SERVERS)
+
+ d.addCallback(_check3)
return d
class TooNewServer(IntroducerService):
# exception.
def test_failure(self):
+ self.basedir = "introducer/NonV1Server/failure"
+ os.makedirs(self.basedir)
+ self.create_tub()
i = TooNewServer()
i.setServiceParent(self.parent)
self.introducer_furl = self.central_tub.registerReference(i)
portnum = l.getPortnum()
tub.setLocation("localhost:%d" % portnum)
- n = FakeNode()
c = IntroducerClient(tub, self.introducer_furl,
- "nickname-client", "version", "oldest")
- c.subscribe_to("storage")
+ u"nickname-client", "version", "oldest")
+ announcements = {}
+ def got(serverid, ann_d):
+ announcements[serverid] = ann_d
+ c.subscribe_to("storage", got)
c.setServiceParent(self.parent)
ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i',
'storage', 'RIStorageServer.tahoe.allmydata.com',
'plancha', 'allmydata-tahoe/1.4.1', '1.0.0')
- (nodeid, service_name) = make_index(ann)
+ (nodeid, service_name) = old.make_index(ann)
self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
self.failUnlessEqual(service_name, "storage")
peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(self._num_peers)]
self.nodeid = "fakenodeid"
- self.storage_broker = StorageFarmBroker()
+ self.storage_broker = StorageFarmBroker(None, True)
for peerid in peerids:
fss = FakeStorageServer(peerid, self._storage)
- self.storage_broker.add_server(peerid, fss)
+ self.storage_broker.test_add_server(peerid, fss)
def get_storage_broker(self):
return self.storage_broker
def debug_break_connection(self, peerid):
- self.storage_broker.servers[peerid].broken = True
+ self.storage_broker.test_servers[peerid].broken = True
def debug_remove_connection(self, peerid):
- self.storage_broker.servers.pop(peerid)
+ self.storage_broker.test_servers.pop(peerid)
def debug_get_connection(self, peerid):
- return self.storage_broker.servers[peerid]
+ return self.storage_broker.test_servers[peerid]
def get_encoding_parameters(self):
return {"k": 3, "n": 10}
sharemap = {}
sb = self._client.get_storage_broker()
- for i,peerid in enumerate(sb.get_all_serverids()):
+ for peerid in sorted(sb.get_all_serverids()):
peerid_s = shortnodeid_b2a(peerid)
for shnum in self._shares1.get(peerid, {}):
if shnum < len(places):
self._num_peers = num_peers
peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(self._num_peers)]
- self.storage_broker = StorageFarmBroker()
+ self.storage_broker = StorageFarmBroker(None, True)
for peerid in peerids:
peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
make_dirs(peerdir)
ss = StorageServer(peerdir, peerid)
lw = LocalWrapper(ss)
- self.storage_broker.add_server(peerid, lw)
+ self.storage_broker.test_add_server(peerid, lw)
self.nodeid = "fakenodeid"
def _check(extra_node):
self.extra_node = extra_node
for c in self.clients:
- all_peerids = list(c.get_storage_broker().get_all_serverids())
+ all_peerids = c.get_storage_broker().get_all_serverids()
self.failUnlessEqual(len(all_peerids), self.numclients+1)
sb = c.storage_broker
- permuted_peers = list(sb.get_servers_for_index("a"))
+ permuted_peers = sb.get_servers_for_index("a")
self.failUnlessEqual(len(permuted_peers), self.numclients+1)
d.addCallback(_check)
d = self.set_up_nodes()
def _check_connections(res):
for c in self.clients:
- all_peerids = list(c.get_storage_broker().get_all_serverids())
+ all_peerids = c.get_storage_broker().get_all_serverids()
self.failUnlessEqual(len(all_peerids), self.numclients)
sb = c.storage_broker
- permuted_peers = list(sb.get_servers_for_index("a"))
+ permuted_peers = sb.get_servers_for_index("a")
self.failUnlessEqual(len(permuted_peers), self.numclients)
d.addCallback(_check_connections)
else:
peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
for fakeid in range(self.num_servers) ]
- self.storage_broker = StorageFarmBroker()
+ self.storage_broker = StorageFarmBroker(None, permute_peers=True)
for (serverid, server) in peers:
- self.storage_broker.add_server(serverid, server)
+ self.storage_broker.test_add_server(serverid, server)
self.last_peers = [p[1] for p in peers]
def log(self, *args, **kwargs):
timeout = 480 # Most of these take longer than 240 seconds on Francois's arm box.
-class FakeIntroducerClient:
- def get_all_connectors(self):
- return {}
- def get_all_connections_for(self, service_name):
- return frozenset()
- def get_all_peerids(self):
- return frozenset()
-
class FakeStatsProvider:
def get_stats(self):
stats = {'stats': {}, 'counters': {}}
'zfec': "fake",
}
introducer_furl = "None"
- introducer_client = FakeIntroducerClient()
+
_all_upload_status = [upload.UploadStatus()]
_all_download_status = [download.DownloadStatus()]
_all_mapupdate_statuses = [servermap.UpdateStatus()]
def connected_to_introducer(self):
return False
- storage_broker = StorageFarmBroker()
+ storage_broker = StorageFarmBroker(None, permute_peers=True)
def get_storage_broker(self):
return self.storage_broker
return "no"
def data_known_storage_servers(self, ctx, data):
- ic = self.client.introducer_client
- servers = [c
- for c in ic.get_all_connectors().values()
- if c.service_name == "storage"]
- return len(servers)
+ sb = self.client.get_storage_broker()
+ return len(sb.get_all_serverids())
def data_connected_storage_servers(self, ctx, data):
- ic = self.client.introducer_client
- return len(ic.get_all_connections_for("storage"))
+ sb = self.client.get_storage_broker()
+ return len(sb.get_all_servers())
def data_services(self, ctx, data):
- ic = self.client.introducer_client
- c = [ (service_name, nodeid, rsc)
- for (nodeid, service_name), rsc
- in ic.get_all_connectors().items() ]
- c.sort()
- return c
-
- def render_service_row(self, ctx, data):
- (service_name, nodeid, rsc) = data
+ sb = self.client.get_storage_broker()
+ return sb.get_all_descriptors()
+
+ def render_service_row(self, ctx, descriptor):
+ nodeid = descriptor.get_serverid()
+
ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid))
- ctx.fillSlots("nickname", rsc.nickname)
- if rsc.rref:
- rhost = rsc.remote_host
+ ctx.fillSlots("nickname", descriptor.get_nickname())
+ rhost = descriptor.get_remote_host()
+ if rhost:
if nodeid == self.client.nodeid:
rhost_s = "(loopback)"
elif isinstance(rhost, address.IPv4Address):
else:
rhost_s = str(rhost)
connected = "Yes: to " + rhost_s
- since = rsc.last_connect_time
+ since = descriptor.get_last_connect_time()
else:
connected = "No"
- since = rsc.last_loss_time
+ since = descriptor.get_last_loss_time()
+ announced = descriptor.get_announcement_time()
+ announcement = descriptor.get_announcement()
+ version = announcement["version"]
+ service_name = announcement["service-name"]
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
ctx.fillSlots("connected", connected)
- ctx.fillSlots("connected-bool", not not rsc.rref)
- ctx.fillSlots("since", time.strftime(TIME_FORMAT, time.localtime(since)))
+ ctx.fillSlots("connected-bool", bool(rhost))
+ ctx.fillSlots("since", time.strftime(TIME_FORMAT,
+ time.localtime(since)))
ctx.fillSlots("announced", time.strftime(TIME_FORMAT,
- time.localtime(rsc.announcement_time)))
- ctx.fillSlots("version", rsc.version)
- ctx.fillSlots("service_name", rsc.service_name)
+ time.localtime(announced)))
+ ctx.fillSlots("version", version)
+ ctx.fillSlots("service_name", service_name)
return ctx.tag