big rework of introducer client: change local API, split division of responsibilites...
authorBrian Warner <warner@lothar.com>
Tue, 23 Jun 2009 02:10:47 +0000 (19:10 -0700)
committerBrian Warner <warner@lothar.com>
Tue, 23 Jun 2009 02:10:47 +0000 (19:10 -0700)
22 files changed:
src/allmydata/client.py
src/allmydata/control.py
src/allmydata/immutable/download.py
src/allmydata/interfaces.py
src/allmydata/introducer/client.py
src/allmydata/introducer/common.py [deleted file]
src/allmydata/introducer/interfaces.py
src/allmydata/introducer/old.py
src/allmydata/introducer/server.py
src/allmydata/node.py
src/allmydata/storage_client.py
src/allmydata/test/common.py
src/allmydata/test/no_network.py
src/allmydata/test/test_checker.py
src/allmydata/test/test_client.py
src/allmydata/test/test_helper.py
src/allmydata/test/test_introducer.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_system.py
src/allmydata/test/test_upload.py
src/allmydata/test/test_web.py
src/allmydata/web/root.py

index 037489ad427a17b5471171ebc5ddb976b8d3fe6e..444a817ef1defaf60d825cfbcc9b4114c5f83446 100644 (file)
@@ -6,7 +6,6 @@ from zope.interface import implements
 from twisted.internet import reactor
 from twisted.application.internet import TimerService
 from foolscap.api import Referenceable
-from foolscap.logging import log
 from pycryptopp.publickey import rsa
 
 import allmydata
@@ -18,7 +17,7 @@ from allmydata.immutable.filenode import FileNode, LiteralFileNode
 from allmydata.immutable.offloaded import Helper
 from allmydata.control import ControlServer
 from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, pollmixin, cachedir
+from allmydata.util import hashutil, base32, pollmixin, cachedir, log
 from allmydata.util.abbreviate import parse_abbreviated_size
 from allmydata.util.time_format import parse_duration, parse_date
 from allmydata.uri import LiteralFileURI
@@ -128,8 +127,6 @@ class Client(node.Node, pollmixin.PollMixin):
         d = self.when_tub_ready()
         def _start_introducer_client(res):
             ic.setServiceParent(self)
-            # nodes that want to upload and download will need storage servers
-            ic.subscribe_to("storage")
         d.addCallback(_start_introducer_client)
         d.addErrback(log.err, facility="tahoe.init",
                      level=log.BAD, umid="URyI5w")
@@ -235,9 +232,11 @@ class Client(node.Node, pollmixin.PollMixin):
     def init_client_storage_broker(self):
         # create a StorageFarmBroker object, for use by Uploader/Downloader
         # (and everybody else who wants to use storage servers)
-        self.storage_broker = sb = storage_client.StorageFarmBroker()
+        sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
+        self.storage_broker = sb
 
-        # load static server specifications from tahoe.cfg, if any
+        # load static server specifications from tahoe.cfg, if any.
+        # Not quite ready yet.
         #if self.config.has_section("client-server-selection"):
         #    server_params = {} # maps serverid to dict of parameters
         #    for (name, value) in self.config.items("client-server-selection"):
@@ -390,8 +389,7 @@ class Client(node.Node, pollmixin.PollMixin):
         temporary test network and need to know when it is safe to proceed
         with an upload or download."""
         def _check():
-            current_clients = list(self.storage_broker.get_all_serverids())
-            return len(current_clients) >= num_clients
+            return len(self.storage_broker.get_all_servers()) >= num_clients
         d = self.poll(_check, 0.5)
         d.addCallback(lambda res: None)
         return d
index 01eb7c3726afcfadb34310ac4c94aee0258ce808..060608b44ce25f86a655a8c87327fe4b7cef9626 100644 (file)
@@ -70,10 +70,10 @@ class ControlServer(Referenceable, service.Service):
         # phase to take more than 10 seconds. Expect worst-case latency to be
         # 300ms.
         results = {}
-        conns = self.parent.introducer_client.get_all_connections_for("storage")
-        everyone = [(peerid,rref) for (peerid, service_name, rref) in conns]
+        sb = self.parent.get_storage_broker()
+        everyone = sb.get_all_servers()
         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
-        everyone = everyone * num_pings
+        everyone = list(everyone) * num_pings
         d = self._do_one_ping(None, everyone, results)
         return d
     def _do_one_ping(self, res, everyone_left, results):
index acc03add0710fe7191f7f805876e5b659bdc9db5..9dfc0bb80d546f4645cf85bb6ab7e8e293470419 100644 (file)
@@ -8,9 +8,10 @@ from foolscap.api import DeadReferenceError, RemoteException, eventually
 from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
 from allmydata.util.assertutil import _assert, precondition
 from allmydata import codec, hashtree, uri
-from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
+from allmydata.interfaces import IDownloadTarget, IDownloader, \
+     IFileURI, IVerifierURI, \
      IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
-     IStorageBroker, NotEnoughSharesError, \
+     IStorageBroker, NotEnoughSharesError, NoServersError, \
      UnableToFetchCriticalDownloadDataError
 from allmydata.immutable import layout
 from allmydata.monitor import Monitor
@@ -747,7 +748,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
     def _get_all_shareholders(self):
         dl = []
         sb = self._storage_broker
-        for (peerid,ss) in sb.get_servers_for_index(self._storage_index):
+        servers = sb.get_servers_for_index(self._storage_index)
+        if not servers:
+            raise NoServersError("broker gave us no servers!")
+        for (peerid,ss) in servers:
             self.log(format="sending DYHB to [%(peerid)s]",
                      peerid=idlib.shortnodeid_b2a(peerid),
                      level=log.NOISY, umid="rT03hg")
index 24d67a5015cd8f5d4e9a05dfea3df27fd30c7b12..e130fa0f340adb2fb3ab9f8f7f198448d47593af 100644 (file)
@@ -360,13 +360,56 @@ class IStorageBroker(Interface):
         """
     def get_all_serverids():
         """
-        @return: iterator of serverid strings
+        @return: frozenset of serverid strings
         """
     def get_nickname_for_serverid(serverid):
         """
         @return: unicode nickname, or None
         """
 
+    # methods moved from IntroducerClient, need review
+    def get_all_connections():
+        """Return a frozenset of (nodeid, service_name, rref) tuples, one for
+        each active connection we've established to a remote service. This is
+        mostly useful for unit tests that need to wait until a certain number
+        of connections have been made."""
+
+    def get_all_connectors():
+        """Return a dict that maps from (nodeid, service_name) to a
+        RemoteServiceConnector instance for all services that we are actively
+        trying to connect to. Each RemoteServiceConnector has the following
+        public attributes::
+
+          service_name: the type of service provided, like 'storage'
+          announcement_time: when we first heard about this service
+          last_connect_time: when we last established a connection
+          last_loss_time: when we last lost a connection
+
+          version: the peer's version, from the most recent connection
+          oldest_supported: the peer's oldest supported version, same
+
+          rref: the RemoteReference, if connected, otherwise None
+          remote_host: the IAddress, if connected, otherwise None
+
+        This method is intended for monitoring interfaces, such as a web page
+        which describes connecting and connected peers.
+        """
+
+    def get_all_peerids():
+        """Return a frozenset of all peerids to whom we have a connection (to
+        one or more services) established. Mostly useful for unit tests."""
+
+    def get_all_connections_for(service_name):
+        """Return a frozenset of (nodeid, service_name, rref) tuples, one
+        for each active connection that provides the given SERVICE_NAME."""
+
+    def get_permuted_peers(service_name, key):
+        """Returns an ordered list of (peerid, rref) tuples, selecting from
+        the connections that provide SERVICE_NAME, using a hash-based
+        permutation keyed by KEY. This randomizes the service list in a
+        repeatable way, to distribute load over many peers.
+        """
+
 
 # hm, we need a solution for forward references in schemas
 FileNode_ = Any() # TODO: foolscap needs constraints on copyables
index db09c7eb9c46fe027aec466ca01f9610cf5cc2d9..31fbb5c2aeb075e1494ae97aeb77655ffd99b112 100644 (file)
 
-import re, time, sha
 from base64 import b32decode
 from zope.interface import implements
 from twisted.application import service
-from foolscap.api import Referenceable
+from foolscap.api import Referenceable, SturdyRef, eventually
 from allmydata.interfaces import InsufficientVersionError
 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
      IIntroducerClient
 from allmydata.util import log, idlib
-from allmydata.util.rrefutil import add_version_to_remote_reference
-from allmydata.introducer.common import make_index
-
-
-class RemoteServiceConnector:
-    """I hold information about a peer service that we want to connect to. If
-    we are connected, I hold the RemoteReference, the peer's address, and the
-    peer's version information. I remember information about when we were
-    last connected to the peer too, even if we aren't currently connected.
-
-    @ivar announcement_time: when we first heard about this service
-    @ivar last_connect_time: when we last established a connection
-    @ivar last_loss_time: when we last lost a connection
-
-    @ivar version: the peer's version, from the most recent announcement
-    @ivar oldest_supported: the peer's oldest supported version, same
-    @ivar nickname: the peer's self-reported nickname, same
-
-    @ivar rref: the RemoteReference, if connected, otherwise None
-    @ivar remote_host: the IAddress, if connected, otherwise None
-    """
-
-    VERSION_DEFAULTS = {
-        "storage": { "http://allmydata.org/tahoe/protocols/storage/v1" :
-                     { "maximum-immutable-share-size": 2**32,
-                       "tolerates-immutable-read-overrun": False,
-                       "delete-mutable-shares-with-zero-length-writev": False,
-                       },
-                     "application-version": "unknown: no get_version()",
-                     },
-        "stub_client": { },
-        }
-
-    def __init__(self, announcement, tub, ic):
-        self._tub = tub
-        self._announcement = announcement
-        self._ic = ic
-        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
-
-        self._furl = furl
-        m = re.match(r'pb://(\w+)@', furl)
-        assert m
-        self._nodeid = b32decode(m.group(1).upper())
-        self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
-
-        self.service_name = service_name
-
-        self.log("attempting to connect to %s" % self._nodeid_s)
-        self.announcement_time = time.time()
-        self.last_loss_time = None
-        self.rref = None
-        self.remote_host = None
-        self.last_connect_time = None
-        self.version = ver
-        self.oldest_supported = oldest
-        self.nickname = nickname
-
-    def log(self, *args, **kwargs):
-        return self._ic.log(*args, **kwargs)
-
-    def startConnecting(self):
-        self._reconnector = self._tub.connectTo(self._furl, self._got_service)
-
-    def stopConnecting(self):
-        self._reconnector.stopConnecting()
-
-    def _got_service(self, rref):
-        self.log("got connection to %s, getting versions" % self._nodeid_s)
-
-        default = self.VERSION_DEFAULTS.get(self.service_name, {})
-        d = add_version_to_remote_reference(rref, default)
-        d.addCallback(self._got_versioned_service)
-
-    def _got_versioned_service(self, rref):
-        self.log("connected to %s, version %s" % (self._nodeid_s, rref.version))
-
-        self.last_connect_time = time.time()
-        self.remote_host = rref.tracker.broker.transport.getPeer()
-
-        self.rref = rref
-
-        self._ic.add_connection(self._nodeid, self.service_name, rref)
-
-        rref.notifyOnDisconnect(self._lost, rref)
-
-    def _lost(self, rref):
-        self.log("lost connection to %s" % self._nodeid_s)
-        self.last_loss_time = time.time()
-        self.rref = None
-        self.remote_host = None
-        self._ic.remove_connection(self._nodeid, self.service_name, rref)
-
-
-    def reset(self):
-        self._reconnector.reset()
+from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
 
 
 class IntroducerClient(service.Service, Referenceable):
@@ -113,32 +18,40 @@ class IntroducerClient(service.Service, Referenceable):
         self._tub = tub
         self.introducer_furl = introducer_furl
 
-        self._nickname = nickname.encode("utf-8")
+        assert type(nickname) is unicode
+        self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
         self._my_version = my_version
         self._oldest_supported = oldest_supported
 
         self._published_announcements = set()
 
         self._publisher = None
-        self._connected = False
 
+        self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
         self._subscribed_service_names = set()
         self._subscriptions = set() # requests we've actually sent
-        self._received_announcements = set()
-        # TODO: this set will grow without bound, until the node is restarted
-
-        # we only accept one announcement per (peerid+service_name) pair.
-        # This insures that an upgraded host replace their previous
-        # announcement. It also means that each peer must have their own Tub
-        # (no sharing), which is slightly weird but consistent with the rest
-        # of the Tahoe codebase.
-        self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
-        # self._connections is a set of (peerid, service_name, rref) tuples
-        self._connections = set()
-
-        self.counter = 0 # incremented each time we change state, for tests
+
+        # _current_announcements remembers one announcement per
+        # (servicename,serverid) pair. Anything that arrives with the same
+        # pair will displace the previous one. This stores unpacked
+        # announcement dictionaries, which can be compared for equality to
+        # distinguish re-announcement from updates. It also provides memory
+        # for clients who subscribe after startup.
+        self._current_announcements = {}
+
         self.encoding_parameters = None
 
+        # hooks for unit tests
+        self._debug_counts = {
+            "inbound_message": 0,
+            "inbound_announcement": 0,
+            "wrong_service": 0,
+            "duplicate_announcement": 0,
+            "update": 0,
+            "new_announcement": 0,
+            "outbound_message": 0,
+            }
+
     def startService(self):
         service.Service.startService(self)
         self._introducer_error = None
@@ -170,7 +83,6 @@ class IntroducerClient(service.Service, Referenceable):
         needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
         if needed not in publisher.version:
             raise InsufficientVersionError(needed, publisher.version)
-        self._connected = True
         self._publisher = publisher
         publisher.notifyOnDisconnect(self._disconnected)
         self._maybe_publish()
@@ -178,16 +90,9 @@ class IntroducerClient(service.Service, Referenceable):
 
     def _disconnected(self):
         self.log("bummer, we've lost our connection to the introducer")
-        self._connected = False
         self._publisher = None
         self._subscriptions.clear()
 
-    def stopService(self):
-        service.Service.stopService(self)
-        self._introducer_reconnector.stopConnecting()
-        for rsc in self._connectors.itervalues():
-            rsc.stopConnecting()
-
     def log(self, *args, **kwargs):
         if "facility" not in kwargs:
             kwargs["facility"] = "tahoe.introducer"
@@ -195,14 +100,19 @@ class IntroducerClient(service.Service, Referenceable):
 
 
     def publish(self, furl, service_name, remoteinterface_name):
+        assert type(self._nickname_utf8) is str # we always send UTF-8
         ann = (furl, service_name, remoteinterface_name,
-               self._nickname, self._my_version, self._oldest_supported)
+               self._nickname_utf8, self._my_version, self._oldest_supported)
         self._published_announcements.add(ann)
         self._maybe_publish()
 
-    def subscribe_to(self, service_name):
+    def subscribe_to(self, service_name, cb, *args, **kwargs):
+        self._local_subscribers.append( (service_name,cb,args,kwargs) )
         self._subscribed_service_names.add(service_name)
         self._maybe_subscribe()
+        for (servicename,nodeid),ann_d in self._current_announcements.items():
+            if servicename == service_name:
+                eventually(cb, nodeid, ann_d)
 
     def _maybe_subscribe(self):
         if not self._publisher:
@@ -215,7 +125,9 @@ class IntroducerClient(service.Service, Referenceable):
                 # duplicate requests.
                 self._subscriptions.add(service_name)
                 d = self._publisher.callRemote("subscribe", self, service_name)
-                d.addErrback(log.err, facility="tahoe.introducer",
+                d.addErrback(trap_deadref)
+                d.addErrback(log.err, format="server errored during subscribe",
+                             facility="tahoe.introducer",
                              level=log.WEIRD, umid="2uMScQ")
 
     def _maybe_publish(self):
@@ -224,100 +136,83 @@ class IntroducerClient(service.Service, Referenceable):
             return
         # this re-publishes everything. The Introducer ignores duplicates
         for ann in self._published_announcements:
+            self._debug_counts["outbound_message"] += 1
             d = self._publisher.callRemote("publish", ann)
-            d.addErrback(log.err, facility="tahoe.introducer",
+            d.addErrback(trap_deadref)
+            d.addErrback(log.err,
+                         format="server errored during publish %(ann)s",
+                         ann=ann, facility="tahoe.introducer",
                          level=log.WEIRD, umid="xs9pVQ")
 
 
 
     def remote_announce(self, announcements):
+        self.log("received %d announcements" % len(announcements))
+        self._debug_counts["inbound_message"] += 1
         for ann in announcements:
-            self.log("received %d announcements" % len(announcements))
-            (furl, service_name, ri_name, nickname, ver, oldest) = ann
-            if service_name not in self._subscribed_service_names:
-                self.log("announcement for a service we don't care about [%s]"
-                         % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
-                continue
-            if ann in self._received_announcements:
-                self.log("ignoring old announcement: %s" % (ann,),
-                         level=log.NOISY)
-                continue
-            self.log("new announcement[%s]: %s" % (service_name, ann))
-            self._received_announcements.add(ann)
-            self._new_announcement(ann)
-
-    def _new_announcement(self, announcement):
-        # this will only be called for new announcements
-        index = make_index(announcement)
-        if index in self._connectors:
-            self.log("replacing earlier announcement", level=log.NOISY)
-            self._connectors[index].stopConnecting()
-        rsc = RemoteServiceConnector(announcement, self._tub, self)
-        self._connectors[index] = rsc
-        rsc.startConnecting()
-
-    def add_connection(self, nodeid, service_name, rref):
-        self._connections.add( (nodeid, service_name, rref) )
-        self.counter += 1
-        # when one connection is established, reset the timers on all others,
-        # to trigger a reconnection attempt in one second. This is intended
-        # to accelerate server connections when we've been offline for a
-        # while. The goal is to avoid hanging out for a long time with
-        # connections to only a subset of the servers, which would increase
-        # the chances that we'll put shares in weird places (and not update
-        # existing shares of mutable files). See #374 for more details.
-        for rsc in self._connectors.values():
-            rsc.reset()
-
-    def remove_connection(self, nodeid, service_name, rref):
-        self._connections.discard( (nodeid, service_name, rref) )
-        self.counter += 1
-
-
-    def get_all_connections(self):
-        return frozenset(self._connections)
-
-    def get_all_connectors(self):
-        return self._connectors.copy()
-
-    def get_all_peerids(self):
-        return frozenset([peerid
-                          for (peerid, service_name, rref)
-                          in self._connections])
-
-    def get_nickname_for_peerid(self, peerid):
-        for k in self._connectors:
-            (peerid0, svcname0) = k
-            if peerid0 == peerid:
-                rsc = self._connectors[k]
-                return rsc.nickname
-        return None
-
-    def get_all_connections_for(self, service_name):
-        return frozenset([c
-                          for c in self._connections
-                          if c[1] == service_name])
-
-    def get_peers(self, service_name):
-        """Return a set of (peerid, versioned-rref) tuples."""
-        return frozenset([(peerid, r) for (peerid, servname, r) in self._connections if servname == service_name])
-
-    def get_permuted_peers(self, service_name, key):
-        """Return an ordered list of (peerid, versioned-rref) tuples."""
-
-        servers = self.get_peers(service_name)
-
-        return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
+            try:
+                self._process_announcement(ann)
+            except:
+                log.err(format="unable to process announcement %(ann)s",
+                        ann=ann)
+                # Don't let a corrupt announcement prevent us from processing
+                # the remaining ones. Don't return an error to the server,
+                # since they'd just ignore it anyways.
+                pass
+
+    def _process_announcement(self, ann):
+        self._debug_counts["inbound_announcement"] += 1
+        (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
+        if service_name not in self._subscribed_service_names:
+            self.log("announcement for a service we don't care about [%s]"
+                     % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
+            self._debug_counts["wrong_service"] += 1
+            return
+        self.log("announcement for [%s]: %s" % (service_name, ann),
+                 umid="BoKEag")
+        assert type(furl) is str
+        assert type(service_name) is str
+        assert type(ri_name) is str
+        assert type(nickname_utf8) is str
+        nickname = nickname_utf8.decode("utf-8")
+        assert type(nickname) is unicode
+        assert type(ver) is str
+        assert type(oldest) is str
+
+        nodeid = b32decode(SturdyRef(furl).tubID.upper())
+        nodeid_s = idlib.shortnodeid_b2a(nodeid)
+
+        ann_d = { "version": 0,
+                  "service-name": service_name,
+
+                  "FURL": furl,
+                  "nickname": nickname,
+                  "app-versions": {}, # need #466 and v2 introducer
+                  "my-version": ver,
+                  "oldest-supported": oldest,
+                  }
+
+        index = (service_name, nodeid)
+        if self._current_announcements.get(index, None) == ann_d:
+            self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
+                     service=service_name, nodeid=nodeid_s,
+                     level=log.UNUSUAL, umid="B1MIdA")
+            self._debug_counts["duplicate_announcement"] += 1
+            return
+        if index in self._current_announcements:
+            self._debug_counts["update"] += 1
+        else:
+            self._debug_counts["new_announcement"] += 1
+
+        self._current_announcements[index] = ann_d
+        # note: we never forget an index, but we might update its value
+
+        for (service_name2,cb,args,kwargs) in self._local_subscribers:
+            if service_name2 == service_name:
+                eventually(cb, nodeid, ann_d, *args, **kwargs)
 
     def remote_set_encoding_parameters(self, parameters):
         self.encoding_parameters = parameters
 
     def connected_to_introducer(self):
-        return self._connected
-
-    def debug_disconnect_from_peerid(self, victim_nodeid):
-        # for unit tests: locate and sever all connections to the given
-        # peerid.
-        for (nodeid, service_name, rref) in self._connections:
-            if nodeid == victim_nodeid:
-                rref.tracker.broker.transport.loseConnection()
+        return bool(self._publisher)
diff --git a/src/allmydata/introducer/common.py b/src/allmydata/introducer/common.py
deleted file mode 100644 (file)
index 54f611a..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-
-import re
-from base64 import b32decode
-
-def make_index(announcement):
-    (furl, service_name, ri_name, nickname, ver, oldest) = announcement
-    m = re.match(r'pb://(\w+)@', furl)
-    assert m
-    nodeid = b32decode(m.group(1).upper())
-    return (nodeid, service_name)
-
index b02bb3556d6cb501a4869926e5e2971cbd01a7c3..54f1701f881f85d4be85bafef92e82c9ac4a9fd8 100644 (file)
@@ -88,53 +88,33 @@ class IIntroducerClient(Interface):
         parameter: this is supposed to be a globally-unique string that
         identifies the RemoteInterface that is implemented."""
 
-    def subscribe_to(service_name):
+    def subscribe_to(service_name, callback, *args, **kwargs):
         """Call this if you will eventually want to use services with the
         given SERVICE_NAME. This will prompt me to subscribe to announcements
-        of those services. You can pick up the announcements later by calling
-        get_all_connections_for() or get_permuted_peers().
-        """
-
-    def get_all_connections():
-        """Return a frozenset of (nodeid, service_name, rref) tuples, one for
-        each active connection we've established to a remote service. This is
-        mostly useful for unit tests that need to wait until a certain number
-        of connections have been made."""
-
-    def get_all_connectors():
-        """Return a dict that maps from (nodeid, service_name) to a
-        RemoteServiceConnector instance for all services that we are actively
-        trying to connect to. Each RemoteServiceConnector has the following
-        public attributes::
-
-          service_name: the type of service provided, like 'storage'
-          announcement_time: when we first heard about this service
-          last_connect_time: when we last established a connection
-          last_loss_time: when we last lost a connection
-
-          version: the peer's version, from the most recent connection
-          oldest_supported: the peer's oldest supported version, same
-
-          rref: the RemoteReference, if connected, otherwise None
-          remote_host: the IAddress, if connected, otherwise None
-
-        This method is intended for monitoring interfaces, such as a web page
-        which describes connecting and connected peers.
-        """
-
-    def get_all_peerids():
-        """Return a frozenset of all peerids to whom we have a connection (to
-        one or more services) established. Mostly useful for unit tests."""
-
-    def get_all_connections_for(service_name):
-        """Return a frozenset of (nodeid, service_name, rref) tuples, one
-        for each active connection that provides the given SERVICE_NAME."""
-
-    def get_permuted_peers(service_name, key):
-        """Returns an ordered list of (peerid, rref) tuples, selecting from
-        the connections that provide SERVICE_NAME, using a hash-based
-        permutation keyed by KEY. This randomizes the service list in a
-        repeatable way, to distribute load over many peers.
+        of those services. Your callback will be invoked with at least two
+        arguments: a serverid (binary string), and an announcement
+        dictionary, followed by any additional callback args/kwargs you give
+        me. I will run your callback for both new announcements and for
+        announcements that have changed, but you must be prepared to tolerate
+        duplicates.
+
+        The announcement dictionary that I give you will have the following
+        keys:
+
+         version: 0
+         service-name: str('storage')
+
+         FURL: str(furl)
+         remoteinterface-name: str(ri_name)
+         nickname: unicode
+         app-versions: {}
+         my-version: str
+         oldest-supported: str
+
+        Note that app-version will be an empty dictionary until #466 is done
+        and both the introducer and the remote client have been upgraded. For
+        current (native) server types, the serverid will always be equal to
+        the binary form of the FURL's tubid.
         """
 
     def connected_to_introducer():
index 2f6fa18ade84580b4b74304cecc2f8cf180579f1..831ddc6c16cf4cbd5e38b1ac65a4928f7fe97ad3 100644 (file)
@@ -11,7 +11,13 @@ from foolscap.api import Referenceable
 from allmydata.util import log, idlib
 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
      IIntroducerClient, RIIntroducerPublisherAndSubscriberService
-from allmydata.introducer.common import make_index
+
+def make_index(announcement):
+    (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+    m = re.match(r'pb://(\w+)@', furl)
+    assert m
+    nodeid = b32decode(m.group(1).upper())
+    return (nodeid, service_name)
 
 class RemoteServiceConnector:
     """I hold information about a peer service that we want to connect to. If
index de511e745f595cee7363017cb8b69bbfd4aa3a53..117fcb5538112abb839a09458150df4167e53e38 100644 (file)
@@ -1,14 +1,14 @@
 
 import time, os.path
+from base64 import b32decode
 from zope.interface import implements
 from twisted.application import service
-from foolscap.api import Referenceable
+from foolscap.api import Referenceable, SturdyRef
 import allmydata
 from allmydata import node
-from allmydata.util import log
+from allmydata.util import log, rrefutil
 from allmydata.introducer.interfaces import \
      RIIntroducerPublisherAndSubscriberService
-from allmydata.introducer.common import make_index
 
 class IntroducerNode(node.Node):
     PORTNUMFILE = "introducer.port"
@@ -55,9 +55,15 @@ class IntroducerService(service.MultiService, Referenceable):
     def __init__(self, basedir="."):
         service.MultiService.__init__(self)
         self.introducer_url = None
-        # 'index' is (tubid, service_name)
+        # 'index' is (service_name, tubid)
         self._announcements = {} # dict of index -> (announcement, timestamp)
         self._subscribers = {} # dict of (rref->timestamp) dicts
+        self._debug_counts = {"inbound_message": 0,
+                              "inbound_duplicate": 0,
+                              "inbound_update": 0,
+                              "outbound_message": 0,
+                              "outbound_announcements": 0,
+                              "inbound_subscribe": 0}
 
     def log(self, *args, **kwargs):
         if "facility" not in kwargs:
@@ -73,23 +79,46 @@ class IntroducerService(service.MultiService, Referenceable):
         return self.VERSION
 
     def remote_publish(self, announcement):
+        try:
+            self._publish(announcement)
+        except:
+            log.err(format="Introducer.remote_publish failed on %(ann)s",
+                    ann=announcement, level=log.UNUSUAL, umid="620rWA")
+            raise
+
+    def _publish(self, announcement):
+        self._debug_counts["inbound_message"] += 1
         self.log("introducer: announcement published: %s" % (announcement,) )
-        index = make_index(announcement)
+        (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
+
+        nodeid = b32decode(SturdyRef(furl).tubID.upper())
+        index = (service_name, nodeid)
+
         if index in self._announcements:
             (old_announcement, timestamp) = self._announcements[index]
             if old_announcement == announcement:
                 self.log("but we already knew it, ignoring", level=log.NOISY)
+                self._debug_counts["inbound_duplicate"] += 1
                 return
             else:
                 self.log("old announcement being updated", level=log.NOISY)
+                self._debug_counts["inbound_update"] += 1
         self._announcements[index] = (announcement, time.time())
-        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+
         for s in self._subscribers.get(service_name, []):
-            s.callRemote("announce", set([announcement]))
+            self._debug_counts["outbound_message"] += 1
+            self._debug_counts["outbound_announcements"] += 1
+            d = s.callRemote("announce", set([announcement]))
+            d.addErrback(rrefutil.trap_deadref)
+            d.addErrback(log.err,
+                         format="subscriber errored on announcement %(ann)s",
+                         ann=announcement, facility="tahoe.introducer",
+                         level=log.UNUSUAL, umid="jfGMXQ")
 
     def remote_subscribe(self, subscriber, service_name):
         self.log("introducer: subscription[%s] request at %s" % (service_name,
                                                                  subscriber))
+        self._debug_counts["inbound_subscribe"] += 1
         if service_name not in self._subscribers:
             self._subscribers[service_name] = {}
         subscribers = self._subscribers[service_name]
@@ -104,11 +133,16 @@ class IntroducerService(service.MultiService, Referenceable):
             subscribers.pop(subscriber, None)
         subscriber.notifyOnDisconnect(_remove)
 
-        announcements = set( [ ann
-                               for idx,(ann,when) in self._announcements.items()
-                               if idx[1] == service_name] )
-        d = subscriber.callRemote("announce", announcements)
-        d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
-
-
+        announcements = set(
+            [ ann
+              for (sn2,nodeid),(ann,when) in self._announcements.items()
+              if sn2 == service_name] )
 
+        self._debug_counts["outbound_message"] += 1
+        self._debug_counts["outbound_announcements"] += len(announcements)
+        d = subscriber.callRemote("announce", announcements)
+        d.addErrback(rrefutil.trap_deadref)
+        d.addErrback(log.err,
+                     format="subscriber errored during subscribe %(anns)s",
+                     anns=announcements, facility="tahoe.introducer",
+                     level=log.UNUSUAL, umid="mtZepQ")
index 582c590f724edb19fce954719cc91b163c0108ed..c695bd0d6ae7c39dbe10cc095f494ae1f7be2e1b 100644 (file)
@@ -62,6 +62,7 @@ class Node(service.MultiService):
 
         nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
         self.nickname = nickname_utf8.decode("utf-8")
+        assert type(self.nickname) is unicode
 
         self.init_tempdir()
         self.create_tub()
index eb4a373391edc47957ce8cd056bac2b7ac112dfa..1dfefdd46840b28f0b8cbcbba14f02d05569d5e5 100644 (file)
@@ -6,21 +6,50 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
 
 # roadmap:
 #
-#  implement ServerFarm, change Client to create it, change
-#  uploader/servermap to get rrefs from it. ServerFarm calls
-#  IntroducerClient.subscribe_to .
+# 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
+# create it, change uploader/servermap to get rrefs from it. ServerFarm calls
+# IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
+# to clients. webapi status pages call broker.get_info_about_serverid.
 #
-#  implement NativeStorageClient, change Tahoe2PeerSelector to use it. All
-#  NativeStorageClients come from the introducer
+# 2: move get_info methods to the descriptor, webapi status pages call
+# broker.get_descriptor_for_serverid().get_info
 #
-#  change web/check_results.py to get NativeStorageClients from check results,
-#  ask it for a nickname (instead of using client.get_nickname_for_serverid)
+# 3?later?: store descriptors in UploadResults/etc instead of serverids,
+# webapi status pages call descriptor.get_info and don't use storage_broker
+# or Client
 #
-#  implement tahoe.cfg scanner, create static NativeStorageClients
+# 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
+# optional. This closes #467
+#
+# 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
+# clients. Clients stop doing callRemote(), use NativeStorageClient methods
+# instead (which might do something else, i.e. http or whatever). The
+# introducer and tahoe.cfg only create NativeStorageClients for now.
+#
+# 6: implement other sorts of IStorageClient classes: S3, etc
 
-import sha
-from zope.interface import implements
+import sha, time
+from zope.interface import implements, Interface
+from foolscap.api import eventually
 from allmydata.interfaces import IStorageBroker
+from allmydata.util import idlib, log
+from allmydata.util.rrefutil import add_version_to_remote_reference
+
+# who is responsible for de-duplication?
+#  both?
+#  IC remembers the unpacked announcements it receives, to provide for late
+#  subscribers and to remove duplicates
+
+# if a client subscribes after startup, will they receive old announcements?
+#  yes
+
+# who will be responsible for signature checking?
+#  make it be IntroducerClient, so they can push the filter outwards and
+#  reduce inbound network traffic
+
+# what should the interface between StorageFarmBroker and IntroducerClient
+# look like?
+#  don't pass signatures: only pass validated blessed-objects
 
 class StorageFarmBroker:
     implements(IStorageBroker)
@@ -30,16 +59,57 @@ class StorageFarmBroker:
     I'm also responsible for subscribing to the IntroducerClient to find out
     about new servers as they are announced by the Introducer.
     """
-    def __init__(self, permute_peers=True):
+    def __init__(self, tub, permute_peers):
+        self.tub = tub
         assert permute_peers # False not implemented yet
-        self.servers = {} # serverid -> StorageClient instance
         self.permute_peers = permute_peers
+        # self.descriptors maps serverid -> IServerDescriptor, and keeps
+        # track of all the storage servers that we've heard about. Each
+        # descriptor manages its own Reconnector, and will give us a
+        # RemoteReference when we ask them for it.
+        self.descriptors = {}
+        # self.servers are statically configured from unit tests
+        self.test_servers = {} # serverid -> rref
         self.introducer_client = None
-    def add_server(self, serverid, s):
-        self.servers[serverid] = s
+
+    # these two are used in unit tests
+    def test_add_server(self, serverid, rref):
+        self.test_servers[serverid] = rref
+    def test_add_descriptor(self, serverid, dsc):
+        self.descriptors[serverid] = dsc
+
     def use_introducer(self, introducer_client):
         self.introducer_client = ic = introducer_client
-        ic.subscribe_to("storage")
+        ic.subscribe_to("storage", self._got_announcement)
+
+    def _got_announcement(self, serverid, ann_d):
+        assert ann_d["service-name"] == "storage"
+        old = self.descriptors.get(serverid)
+        if old:
+            if old.get_announcement() == ann_d:
+                return # duplicate
+            # replacement
+            del self.descriptors[serverid]
+            old.stop_connecting()
+            # now we forget about them and start using the new one
+        dsc = NativeStorageClientDescriptor(serverid, ann_d)
+        self.descriptors[serverid] = dsc
+        dsc.start_connecting(self.tub, self._trigger_connections)
+        # the descriptor will manage their own Reconnector, and each time we
+        # need servers, we'll ask them if they're connected or not.
+
+    def _trigger_connections(self):
+        # when one connection is established, reset the timers on all others,
+        # to trigger a reconnection attempt in one second. This is intended
+        # to accelerate server connections when we've been offline for a
+        # while. The goal is to avoid hanging out for a long time with
+        # connections to only a subset of the servers, which would increase
+        # the chances that we'll put shares in weird places (and not update
+        # existing shares of mutable files). See #374 for more details.
+        for dsc in self.descriptors.values():
+            dsc.try_to_connect()
+
+
 
     def get_servers_for_index(self, peer_selection_index):
         # first cut: return a list of (peerid, versioned-rref) tuples
@@ -51,34 +121,141 @@ class StorageFarmBroker:
     def get_all_servers(self):
         # return a frozenset of (peerid, versioned-rref) tuples
         servers = {}
-        for serverid,server in self.servers.items():
-            servers[serverid] = server
-        if self.introducer_client:
-            ic = self.introducer_client
-            for serverid,server in ic.get_peers("storage"):
-                servers[serverid] = server
+        for serverid,rref in self.test_servers.items():
+            servers[serverid] = rref
+        for serverid,dsc in self.descriptors.items():
+            rref = dsc.get_rref()
+            if rref:
+                servers[serverid] = rref
         return frozenset(servers.items())
 
     def get_all_serverids(self):
-        for serverid in self.servers:
-            yield serverid
-        if self.introducer_client:
-            for serverid,server in self.introducer_client.get_peers("storage"):
-                yield serverid
+        serverids = set()
+        serverids.update(self.test_servers.keys())
+        serverids.update(self.descriptors.keys())
+        return frozenset(serverids)
+
+    def get_all_descriptors(self):
+        return sorted(self.descriptors.values(),
+                      key=lambda dsc: dsc.get_serverid())
 
     def get_nickname_for_serverid(self, serverid):
-        if serverid in self.servers:
-            return self.servers[serverid].nickname
-        if self.introducer_client:
-            return self.introducer_client.get_nickname_for_peerid(serverid)
+        if serverid in self.descriptors:
+            return self.descriptors[serverid].get_nickname()
         return None
 
-class NativeStorageClient:
-    def __init__(self, serverid, furl, nickname, min_shares=1):
+
+class IServerDescriptor(Interface):
+    def start_connecting(tub, trigger_cb):
+        pass
+    def get_nickname():
+        pass
+    def get_rref():
+        pass
+
+class NativeStorageClientDescriptor:
+    """I hold information about a storage server that we want to connect to.
+    If we are connected, I hold the RemoteReference, their host address, and
+    the their version information. I remember information about when we were
+    last connected too, even if we aren't currently connected.
+
+    @ivar announcement_time: when we first heard about this service
+    @ivar last_connect_time: when we last established a connection
+    @ivar last_loss_time: when we last lost a connection
+
+    @ivar version: the server's versiondict, from the most recent announcement
+    @ivar nickname: the server's self-reported nickname (unicode), same
+
+    @ivar rref: the RemoteReference, if connected, otherwise None
+    @ivar remote_host: the IAddress, if connected, otherwise None
+    """
+    implements(IServerDescriptor)
+
+    VERSION_DEFAULTS = {
+        "http://allmydata.org/tahoe/protocols/storage/v1" :
+        { "maximum-immutable-share-size": 2**32,
+          "tolerates-immutable-read-overrun": False,
+          "delete-mutable-shares-with-zero-length-writev": False,
+          },
+        "application-version": "unknown: no get_version()",
+        }
+
+    def __init__(self, serverid, ann_d, min_shares=1):
         self.serverid = serverid
-        self.furl = furl
-        self.nickname = nickname
+        self.announcement = ann_d
         self.min_shares = min_shares
 
+        self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
+        self.announcement_time = time.time()
+        self.last_connect_time = None
+        self.last_loss_time = None
+        self.remote_host = None
+        self.rref = None
+        self._reconnector = None
+        self._trigger_cb = None
+
+    def get_serverid(self):
+        return self.serverid
+
+    def get_nickname(self):
+        return self.announcement["nickname"].decode("utf-8")
+    def get_announcement(self):
+        return self.announcement
+    def get_remote_host(self):
+        return self.remote_host
+    def get_last_connect_time(self):
+        return self.last_connect_time
+    def get_last_loss_time(self):
+        return self.last_loss_time
+    def get_announcement_time(self):
+        return self.announcement_time
+
+    def start_connecting(self, tub, trigger_cb):
+        furl = self.announcement["FURL"]
+        self._trigger_cb = trigger_cb
+        self._reconnector = tub.connectTo(furl, self._got_connection)
+
+    def _got_connection(self, rref):
+        lp = log.msg(format="got connection to %(serverid)s, getting versions",
+                     serverid=self.serverid_s,
+                     facility="tahoe.storage_broker", umid="coUECQ")
+        if self._trigger_cb:
+            eventually(self._trigger_cb)
+        default = self.VERSION_DEFAULTS
+        d = add_version_to_remote_reference(rref, default)
+        d.addCallback(self._got_versioned_service, lp)
+        d.addErrback(log.err, format="storageclient._got_connection",
+                     serverid=self.serverid_s, umid="Sdq3pg")
+
+    def _got_versioned_service(self, rref, lp):
+        log.msg(format="%(serverid)s provided version info %(version)s",
+                serverid=self.serverid_s, version=rref.version,
+                facility="tahoe.storage_broker", umid="SWmJYg",
+                level=log.NOISY, parent=lp)
+
+        self.last_connect_time = time.time()
+        self.remote_host = rref.getPeer()
+        self.rref = rref
+        rref.notifyOnDisconnect(self._lost)
+
+    def get_rref(self):
+        return self.rref
+
+    def _lost(self):
+        log.msg(format="lost connection to %(serverid)s",
+                serverid=self.serverid_s,
+                facility="tahoe.storage_broker", umid="zbRllw")
+        self.last_loss_time = time.time()
+        self.rref = None
+        self.remote_host = None
+
+    def stop_connecting(self):
+        # used when this descriptor has been superceded by another
+        self._reconnector.stopConnecting()
+
+    def try_to_connect(self):
+        # used when the broker wants us to hurry up
+        self._reconnector.reset()
+
 class UnknownServerTypeError(Exception):
     pass
index ee5c2650a139a8de003dd248f82386bf9232c40a..6140e8411fe477a7f349d4aed3fc68a48539a54e 100644 (file)
@@ -533,10 +533,10 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
 
     def _check_connections(self):
         for c in self.clients:
-            ic = c.introducer_client
-            if not ic.connected_to_introducer():
+            if not c.connected_to_introducer():
                 return False
-            if len(ic.get_all_peerids()) != self.numclients:
+            sb = c.get_storage_broker()
+            if len(sb.get_all_servers()) != self.numclients:
                 return False
         return True
 
index 8af6d1b869a77eb1483daf3eb439104df95d2d4a..eeb14fa35956bc3ff809c4ea199a8dabaf1dc1be 100644 (file)
@@ -25,7 +25,6 @@ from allmydata import uri as tahoe_uri
 from allmydata.client import Client
 from allmydata.storage.server import StorageServer, storage_index_to_dir
 from allmydata.util import fileutil, idlib, hashutil
-from allmydata.introducer.client import RemoteServiceConnector
 from allmydata.test.common_web import HTTPClientGETFactory
 from allmydata.interfaces import IStorageBroker
 
@@ -93,17 +92,13 @@ class LocalWrapper:
     def dontNotifyOnDisconnect(self, marker):
         del self.disconnectors[marker]
 
-def wrap(original, service_name):
+def wrap_storage_server(original):
     # Much of the upload/download code uses rref.version (which normally
     # comes from rrefutil.add_version_to_remote_reference). To avoid using a
     # network, we want a LocalWrapper here. Try to satisfy all these
     # constraints at the same time.
     wrapper = LocalWrapper(original)
-    try:
-        version = original.remote_get_version()
-    except AttributeError:
-        version = RemoteServiceConnector.VERSION_DEFAULTS[service_name]
-    wrapper.version = version
+    wrapper.version = original.remote_get_version()
     return wrapper
 
 class NoNetworkStorageBroker:
@@ -220,7 +215,7 @@ class NoNetworkGrid(service.MultiService):
         ss.setServiceParent(middleman)
         serverid = ss.my_nodeid
         self.servers_by_number[i] = ss
-        self.servers_by_id[serverid] = wrap(ss, "storage")
+        self.servers_by_id[serverid] = wrap_storage_server(ss)
         self.all_servers = frozenset(self.servers_by_id.items())
         for c in self.clients:
             c._servers = self.all_servers
index 88f749517453ca418c7f8e2069e3fd3eea4b474d..e52e59b203e8cb73911975ecd35fc5a091daa8ff 100644 (file)
@@ -3,7 +3,7 @@ import simplejson
 from twisted.trial import unittest
 from allmydata import check_results, uri
 from allmydata.web import check_results as web_check_results
-from allmydata.storage_client import StorageFarmBroker, NativeStorageClient
+from allmydata.storage_client import StorageFarmBroker, NativeStorageClientDescriptor
 from common_web import WebRenderingMixin
 
 class FakeClient:
@@ -13,12 +13,20 @@ class FakeClient:
 class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
 
     def create_fake_client(self):
-        sb = StorageFarmBroker()
+        sb = StorageFarmBroker(None, True)
         for (peerid, nickname) in [("\x00"*20, "peer-0"),
                                    ("\xff"*20, "peer-f"),
                                    ("\x11"*20, "peer-11")] :
-            n = NativeStorageClient(peerid, None, nickname)
-            sb.add_server(peerid, n)
+            ann_d = { "version": 0,
+                      "service-name": "storage",
+                      "FURL": "fake furl",
+                      "nickname": unicode(nickname),
+                      "app-versions": {}, # need #466 and v2 introducer
+                      "my-version": "ver",
+                      "oldest-supported": "oldest",
+                      }
+            dsc = NativeStorageClientDescriptor(peerid, ann_d)
+            sb.test_add_descriptor(peerid, dsc)
         c = FakeClient()
         c.storage_broker = sb
         return c
index 63f4962f2c6fb87c08c8d92dbc1bcdaf083b8ce0..6040e2505cfa2eea3a38ba15ccb6eecc63202060 100644 (file)
@@ -146,13 +146,13 @@ class Basic(unittest.TestCase):
                  for (peerid,rref) in sb.get_servers_for_index(key) ]
 
     def test_permute(self):
-        sb = StorageFarmBroker()
+        sb = StorageFarmBroker(None, True)
         for k in ["%d" % i for i in range(5)]:
-            sb.add_server(k, None)
+            sb.test_add_server(k, None)
 
         self.failUnlessEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
         self.failUnlessEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
-        sb.servers = {}
+        sb.test_servers.clear()
         self.failUnlessEqual(self._permute(sb, "one"), [])
 
     def test_versions(self):
index d2e689c0b6ed69eb1accce5e45c0d15c33c3389f..113893b85bb01f70e1d6997fe934699345467f0a 100644 (file)
@@ -63,7 +63,7 @@ class FakeClient(service.MultiService):
                                    "max_segment_size": 1*MiB,
                                    }
     stats_provider = None
-    storage_broker = StorageFarmBroker()
+    storage_broker = StorageFarmBroker(None, True)
     def log(self, *args, **kwargs):
         return log.msg(*args, **kwargs)
     def get_encoding_parameters(self):
index a42e0c1aae7c88c09c25a9404bf9771bf933663f..d5c59e09d22e91f17b5b4e2e533e42cfe7ce911c 100644 (file)
@@ -11,16 +11,12 @@ from twisted.application import service
 from allmydata.interfaces import InsufficientVersionError
 from allmydata.introducer.client import IntroducerClient
 from allmydata.introducer.server import IntroducerService
-from allmydata.introducer.common import make_index
 # test compatibility with old introducer .tac files
 from allmydata.introducer import IntroducerNode
 from allmydata.introducer import old
-from allmydata.util import idlib, pollmixin
+from allmydata.util import pollmixin
 import common_util as testutil
 
-class FakeNode(Referenceable):
-    pass
-
 class LoggingMultiService(service.MultiService):
     def log(self, msg, **kw):
         log.msg(msg, **kw)
@@ -51,7 +47,7 @@ class ServiceMixin:
 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
 
     def test_create(self):
-        ic = IntroducerClient(None, "introducer.furl", "my_nickname",
+        ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
                               "my_version", "oldest_version")
 
     def test_listen(self):
@@ -79,33 +75,35 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
 
 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
 
-    def setUp(self):
-        ServiceMixin.setUp(self)
-        self.central_tub = tub = Tub()
+    def create_tub(self, portnum=0):
+        tubfile = os.path.join(self.basedir, "tub.pem")
+        self.central_tub = tub = Tub(certFile=tubfile)
         #tub.setOption("logLocalFailures", True)
         #tub.setOption("logRemoteFailures", True)
         tub.setOption("expose-remote-exception-types", False)
         tub.setServiceParent(self.parent)
-        l = tub.listenOn("tcp:0")
-        portnum = l.getPortnum()
-        tub.setLocation("localhost:%d" % portnum)
+        l = tub.listenOn("tcp:%d" % portnum)
+        self.central_portnum = l.getPortnum()
+        if portnum != 0:
+            assert self.central_portnum == portnum
+        tub.setLocation("localhost:%d" % self.central_portnum)
 
 class SystemTest(SystemTestMixin, unittest.TestCase):
 
     def test_system(self):
-        i = IntroducerService()
-        i.setServiceParent(self.parent)
-        self.introducer_furl = self.central_tub.registerReference(i)
-        return self.do_system_test()
+        self.basedir = "introducer/SystemTest/system"
+        os.makedirs(self.basedir)
+        return self.do_system_test(IntroducerService)
     test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
 
-    def test_system_oldserver(self):
-        i = old.IntroducerService_V1()
-        i.setServiceParent(self.parent)
-        self.introducer_furl = self.central_tub.registerReference(i)
-        return self.do_system_test()
-
-    def do_system_test(self):
+    def do_system_test(self, create_introducer):
+        self.create_tub()
+        introducer = create_introducer()
+        introducer.setServiceParent(self.parent)
+        iff = os.path.join(self.basedir, "introducer.furl")
+        tub = self.central_tub
+        ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
+        self.introducer_furl = ifurl
 
         NUMCLIENTS = 5
         # we have 5 clients who publish themselves, and an extra one does
@@ -114,6 +112,11 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
 
         clients = []
         tubs = {}
+        received_announcements = {}
+        NUM_SERVERS = NUMCLIENTS
+        subscribing_clients = []
+        publishing_clients = []
+
         for i in range(NUMCLIENTS+1):
             tub = Tub()
             #tub.setOption("logLocalFailures", True)
@@ -124,115 +127,210 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
             portnum = l.getPortnum()
             tub.setLocation("localhost:%d" % portnum)
 
-            n = FakeNode()
             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
-            client_class = IntroducerClient
-            if i == 0:
-                client_class = old.IntroducerClient_V1
-            c = client_class(tub, self.introducer_furl,
-                             "nickname-%d" % i, "version", "oldest")
+            c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i,
+                                 "version", "oldest")
+            received_announcements[c] = ra = {}
+            def got(serverid, ann_d, announcements):
+                announcements[serverid] = ann_d
+            c.subscribe_to("storage", got, received_announcements[c])
+            subscribing_clients.append(c)
+
             if i < NUMCLIENTS:
-                node_furl = tub.registerReference(n)
+                node_furl = tub.registerReference(Referenceable())
                 c.publish(node_furl, "storage", "ri_name")
+                publishing_clients.append(c)
             # the last one does not publish anything
 
-            c.subscribe_to("storage")
-
             c.setServiceParent(self.parent)
             clients.append(c)
             tubs[c] = tub
 
         def _wait_for_all_connections():
-            for c in clients:
-                if len(c.get_all_connections()) < NUMCLIENTS:
+            for c in subscribing_clients:
+                if len(received_announcements[c]) < NUM_SERVERS:
                     return False
             return True
         d = self.poll(_wait_for_all_connections)
 
         def _check1(res):
             log.msg("doing _check1")
+            dc = introducer._debug_counts
+            self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS)
+            self.failUnlessEqual(dc["inbound_duplicate"], 0)
+            self.failUnlessEqual(dc["inbound_update"], 0)
+            self.failUnless(dc["outbound_message"])
+
             for c in clients:
                 self.failUnless(c.connected_to_introducer())
-                self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
-                self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
-                self.failUnlessEqual(len(c.get_all_connections_for("storage")),
-                                     NUMCLIENTS)
+            for c in subscribing_clients:
+                cdc = c._debug_counts
+                self.failUnless(cdc["inbound_message"])
+                self.failUnlessEqual(cdc["inbound_announcement"],
+                                     NUM_SERVERS)
+                self.failUnlessEqual(cdc["wrong_service"], 0)
+                self.failUnlessEqual(cdc["duplicate_announcement"], 0)
+                self.failUnlessEqual(cdc["update"], 0)
+                self.failUnlessEqual(cdc["new_announcement"],
+                                     NUM_SERVERS)
+                anns = received_announcements[c]
+                self.failUnlessEqual(len(anns), NUM_SERVERS)
+
                 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
-                self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
-                                     "nickname-0")
+                ann_d = anns[nodeid0]
+                nick = ann_d["nickname"]
+                self.failUnlessEqual(type(nick), unicode)
+                self.failUnlessEqual(nick, u"nickname-0")
+            for c in publishing_clients:
+                cdc = c._debug_counts
+                self.failUnlessEqual(cdc["outbound_message"], 1)
         d.addCallback(_check1)
 
-        origin_c = clients[0]
-        def _disconnect_somebody_else(res):
-            # now disconnect somebody's connection to someone else
-            current_counter = origin_c.counter
-            victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
-            log.msg(" disconnecting %s->%s" %
-                    (tubs[origin_c].tubID,
-                     idlib.shortnodeid_b2a(victim_nodeid)))
-            origin_c.debug_disconnect_from_peerid(victim_nodeid)
-            log.msg(" did disconnect")
-
-            # then wait until something changes, which ought to be them
-            # noticing the loss
-            def _compare():
-                return current_counter != origin_c.counter
-            return self.poll(_compare)
-
-        d.addCallback(_disconnect_somebody_else)
-
-        # and wait for them to reconnect
-        d.addCallback(lambda res: self.poll(_wait_for_all_connections))
+        # force an introducer reconnect, by shutting down the Tub it's using
+        # and starting a new Tub (with the old introducer). Everybody should
+        # reconnect and republish, but the introducer should ignore the
+        # republishes as duplicates. However, because the server doesn't know
+        # what each client does and does not know, it will send them a copy
+        # of the current announcement table anyway.
+
+        d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
+        d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
+
+        def _wait_for_introducer_loss():
+            for c in clients:
+                if c.connected_to_introducer():
+                    return False
+            return True
+        d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
+
+        def _restart_introducer_tub(_ign):
+            log.msg("restarting introducer's Tub")
+
+            # note: old.Server doesn't have this count
+            dc = introducer._debug_counts
+            self.expected_count = dc["inbound_message"] + NUM_SERVERS
+            self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1
+            introducer._debug0 = dc["outbound_message"]
+            for c in subscribing_clients:
+                cdc = c._debug_counts
+                c._debug0 = cdc["inbound_message"]
+
+            self.create_tub(self.central_portnum)
+            newfurl = self.central_tub.registerReference(introducer,
+                                                         furlFile=iff)
+            assert newfurl == self.introducer_furl
+        d.addCallback(_restart_introducer_tub)
+
+        def _wait_for_introducer_reconnect():
+            # wait until:
+            #  all clients are connected
+            #  the introducer has received publish messages from all of them
+            #  the introducer has received subscribe messages from all of them
+            #  the introducer has sent (duplicate) announcements to all of them
+            #  all clients have received (duplicate) announcements
+            dc = introducer._debug_counts
+            for c in clients:
+                if not c.connected_to_introducer():
+                    return False
+            if dc["inbound_message"] < self.expected_count:
+                return False
+            if dc["inbound_subscribe"] < self.expected_subscribe_count:
+                return False
+            for c in subscribing_clients:
+                cdc = c._debug_counts
+                if cdc["inbound_message"] < c._debug0+1:
+                    return False
+            return True
+        d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
+
         def _check2(res):
             log.msg("doing _check2")
+            # assert that the introducer sent out new messages, one per
+            # subscriber
+            dc = introducer._debug_counts
+            self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS)
+            self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS)
+            self.failUnlessEqual(dc["inbound_update"], 0)
+            self.failUnlessEqual(dc["outbound_message"],
+                                 introducer._debug0 + len(subscribing_clients))
             for c in clients:
-                self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
+                self.failUnless(c.connected_to_introducer())
+            for c in subscribing_clients:
+                cdc = c._debug_counts
+                self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS)
         d.addCallback(_check2)
 
-        def _disconnect_yourself(res):
-            # now disconnect somebody's connection to themselves.
-            current_counter = origin_c.counter
-            victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
-            log.msg(" disconnecting %s->%s" %
-                    (tubs[origin_c].tubID,
-                     idlib.shortnodeid_b2a(victim_nodeid)))
-            origin_c.debug_disconnect_from_peerid(victim_nodeid)
-            log.msg(" did disconnect from self")
-
-            def _compare():
-                return current_counter != origin_c.counter
-            return self.poll(_compare)
-        d.addCallback(_disconnect_yourself)
-
-        d.addCallback(lambda res: self.poll(_wait_for_all_connections))
-        def _check3(res):
-            log.msg("doing _check3")
-            for c in clients:
-                self.failUnlessEqual(len(c.get_all_connections_for("storage")),
-                                     NUMCLIENTS)
-        d.addCallback(_check3)
-        def _shutdown_introducer(res):
-            # now shut down the introducer. We do this by shutting down the
-            # tub it's using. Nobody's connections (to each other) should go
-            # down. All clients should notice the loss, and no other errors
-            # should occur.
-            log.msg("shutting down the introducer")
-            return self.central_tub.disownServiceParent()
-        d.addCallback(_shutdown_introducer)
-        def _wait_for_introducer_loss():
+        # Then force an introducer restart, by shutting down the Tub,
+        # destroying the old introducer, and starting a new Tub+Introducer.
+        # Everybody should reconnect and republish, and the (new) introducer
+        # will distribute the new announcements, but the clients should
+        # ignore the republishes as duplicates.
+
+        d.addCallback(lambda _ign: log.msg("shutting down introducer"))
+        d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
+        d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
+
+        def _restart_introducer(_ign):
+            log.msg("restarting introducer")
+            self.create_tub(self.central_portnum)
+
+            for c in subscribing_clients:
+                # record some counters for later comparison. Stash the values
+                # on the client itself, because I'm lazy.
+                cdc = c._debug_counts
+                c._debug1 = cdc["inbound_announcement"]
+                c._debug2 = cdc["inbound_message"]
+                c._debug3 = cdc["new_announcement"]
+            newintroducer = create_introducer()
+            self.expected_message_count = NUM_SERVERS
+            self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients)
+            self.expected_subscribe_count = len(subscribing_clients)
+            newfurl = self.central_tub.registerReference(newintroducer,
+                                                         furlFile=iff)
+            assert newfurl == self.introducer_furl
+        d.addCallback(_restart_introducer)
+        def _wait_for_introducer_reconnect2():
+            # wait until:
+            #  all clients are connected
+            #  the introducer has received publish messages from all of them
+            #  the introducer has received subscribe messages from all of them
+            #  the introducer has sent announcements for everybody to everybody
+            #  all clients have received all the (duplicate) announcements
+            # at that point, the system should be quiescent
+            dc = introducer._debug_counts
             for c in clients:
-                if c.connected_to_introducer():
+                if not c.connected_to_introducer():
+                    return False
+            if dc["inbound_message"] < self.expected_message_count:
+                return False
+            if dc["outbound_announcements"] < self.expected_announcement_count:
+                return False
+            if dc["inbound_subscribe"] < self.expected_subscribe_count:
+                return False
+            for c in subscribing_clients:
+                cdc = c._debug_counts
+                if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
                     return False
             return True
-        d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
+        d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
 
-        def _check4(res):
-            log.msg("doing _check4")
+        def _check3(res):
+            log.msg("doing _check3")
             for c in clients:
-                self.failUnlessEqual(len(c.get_all_connections_for("storage")),
-                                     NUMCLIENTS)
-                self.failIf(c.connected_to_introducer())
-        d.addCallback(_check4)
+                self.failUnless(c.connected_to_introducer())
+            for c in subscribing_clients:
+                cdc = c._debug_counts
+                self.failUnless(cdc["inbound_announcement"] > c._debug1)
+                self.failUnless(cdc["inbound_message"] > c._debug2)
+                # there should have been no new announcements
+                self.failUnlessEqual(cdc["new_announcement"], c._debug3)
+                # and the right number of duplicate ones. There were
+                # NUM_SERVERS from the servertub restart, and there should be
+                # another NUM_SERVERS now
+                self.failUnlessEqual(cdc["duplicate_announcement"],
+                                     2*NUM_SERVERS)
+
+        d.addCallback(_check3)
         return d
 
 class TooNewServer(IntroducerService):
@@ -247,6 +345,9 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
     # exception.
 
     def test_failure(self):
+        self.basedir = "introducer/NonV1Server/failure"
+        os.makedirs(self.basedir)
+        self.create_tub()
         i = TooNewServer()
         i.setServiceParent(self.parent)
         self.introducer_furl = self.central_tub.registerReference(i)
@@ -258,10 +359,12 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
         portnum = l.getPortnum()
         tub.setLocation("localhost:%d" % portnum)
 
-        n = FakeNode()
         c = IntroducerClient(tub, self.introducer_furl,
-                             "nickname-client", "version", "oldest")
-        c.subscribe_to("storage")
+                             u"nickname-client", "version", "oldest")
+        announcements = {}
+        def got(serverid, ann_d):
+            announcements[serverid] = ann_d
+        c.subscribe_to("storage", got)
 
         c.setServiceParent(self.parent)
 
@@ -283,7 +386,7 @@ class Index(unittest.TestCase):
         ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i',
                'storage', 'RIStorageServer.tahoe.allmydata.com',
                'plancha', 'allmydata-tahoe/1.4.1', '1.0.0')
-        (nodeid, service_name) = make_index(ann)
+        (nodeid, service_name) = old.make_index(ann)
         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
         self.failUnlessEqual(service_name, "storage")
 
index 23bc676126206b27eb20813e1ea205c21775c49e..36064b2f1b41f053991de96b6aff069a585d9417 100644 (file)
@@ -174,19 +174,19 @@ class FakeClient:
         peerids = [tagged_hash("peerid", "%d" % i)[:20]
                    for i in range(self._num_peers)]
         self.nodeid = "fakenodeid"
-        self.storage_broker = StorageFarmBroker()
+        self.storage_broker = StorageFarmBroker(None, True)
         for peerid in peerids:
             fss = FakeStorageServer(peerid, self._storage)
-            self.storage_broker.add_server(peerid, fss)
+            self.storage_broker.test_add_server(peerid, fss)
 
     def get_storage_broker(self):
         return self.storage_broker
     def debug_break_connection(self, peerid):
-        self.storage_broker.servers[peerid].broken = True
+        self.storage_broker.test_servers[peerid].broken = True
     def debug_remove_connection(self, peerid):
-        self.storage_broker.servers.pop(peerid)
+        self.storage_broker.test_servers.pop(peerid)
     def debug_get_connection(self, peerid):
-        return self.storage_broker.servers[peerid]
+        return self.storage_broker.test_servers[peerid]
 
     def get_encoding_parameters(self):
         return {"k": 3, "n": 10}
@@ -1569,7 +1569,7 @@ class MultipleEncodings(unittest.TestCase):
             sharemap = {}
             sb = self._client.get_storage_broker()
 
-            for i,peerid in enumerate(sb.get_all_serverids()):
+            for peerid in sorted(sb.get_all_serverids()):
                 peerid_s = shortnodeid_b2a(peerid)
                 for shnum in self._shares1.get(peerid, {}):
                     if shnum < len(places):
@@ -1794,13 +1794,13 @@ class LessFakeClient(FakeClient):
         self._num_peers = num_peers
         peerids = [tagged_hash("peerid", "%d" % i)[:20] 
                    for i in range(self._num_peers)]
-        self.storage_broker = StorageFarmBroker()
+        self.storage_broker = StorageFarmBroker(None, True)
         for peerid in peerids:
             peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
             make_dirs(peerdir)
             ss = StorageServer(peerdir, peerid)
             lw = LocalWrapper(ss)
-            self.storage_broker.add_server(peerid, lw)
+            self.storage_broker.test_add_server(peerid, lw)
         self.nodeid = "fakenodeid"
 
 
index 861a20072afd85020bea265a96214e6224fdfe87..9a7f469806e60fe15eb0ff16a5ffa5ea1c783b7f 100644 (file)
@@ -73,10 +73,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
         def _check(extra_node):
             self.extra_node = extra_node
             for c in self.clients:
-                all_peerids = list(c.get_storage_broker().get_all_serverids())
+                all_peerids = c.get_storage_broker().get_all_serverids()
                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
                 sb = c.storage_broker
-                permuted_peers = list(sb.get_servers_for_index("a"))
+                permuted_peers = sb.get_servers_for_index("a")
                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
 
         d.addCallback(_check)
@@ -108,10 +108,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
         d = self.set_up_nodes()
         def _check_connections(res):
             for c in self.clients:
-                all_peerids = list(c.get_storage_broker().get_all_serverids())
+                all_peerids = c.get_storage_broker().get_all_serverids()
                 self.failUnlessEqual(len(all_peerids), self.numclients)
                 sb = c.storage_broker
-                permuted_peers = list(sb.get_servers_for_index("a"))
+                permuted_peers = sb.get_servers_for_index("a")
                 self.failUnlessEqual(len(permuted_peers), self.numclients)
         d.addCallback(_check_connections)
 
index 487ba0a17875140d4d9878938111946cbdea94b5..ffeb4ee7fbc726e7c25e5faae4cfb5a039e26d7b 100644 (file)
@@ -173,9 +173,9 @@ class FakeClient:
         else:
             peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
                            for fakeid in range(self.num_servers) ]
-        self.storage_broker = StorageFarmBroker()
+        self.storage_broker = StorageFarmBroker(None, permute_peers=True)
         for (serverid, server) in peers:
-            self.storage_broker.add_server(serverid, server)
+            self.storage_broker.test_add_server(serverid, server)
         self.last_peers = [p[1] for p in peers]
 
     def log(self, *args, **kwargs):
index 846c29b424886385165170f4e179afa4d09cff9d..49c4e29faf8632295a12d0e40a31c362bee0d838 100644 (file)
@@ -31,14 +31,6 @@ from allmydata.test.common_web import HTTPClientGETFactory, \
 
 timeout = 480 # Most of these take longer than 240 seconds on Francois's arm box.
 
-class FakeIntroducerClient:
-    def get_all_connectors(self):
-        return {}
-    def get_all_connections_for(self, service_name):
-        return frozenset()
-    def get_all_peerids(self):
-        return frozenset()
-
 class FakeStatsProvider:
     def get_stats(self):
         stats = {'stats': {}, 'counters': {}}
@@ -55,7 +47,7 @@ class FakeClient(service.MultiService):
                 'zfec': "fake",
                 }
     introducer_furl = "None"
-    introducer_client = FakeIntroducerClient()
+
     _all_upload_status = [upload.UploadStatus()]
     _all_download_status = [download.DownloadStatus()]
     _all_mapupdate_statuses = [servermap.UpdateStatus()]
@@ -67,7 +59,7 @@ class FakeClient(service.MultiService):
     def connected_to_introducer(self):
         return False
 
-    storage_broker = StorageFarmBroker()
+    storage_broker = StorageFarmBroker(None, permute_peers=True)
     def get_storage_broker(self):
         return self.storage_broker
 
index 910847ca2d5e67d6cae605fcab465a3930a58c76..6fa84b6ad34bdb2247775744f97ba9a74904df92 100644 (file)
@@ -238,30 +238,24 @@ class Root(rend.Page):
         return "no"
 
     def data_known_storage_servers(self, ctx, data):
-        ic = self.client.introducer_client
-        servers = [c
-                   for c in ic.get_all_connectors().values()
-                   if c.service_name == "storage"]
-        return len(servers)
+        sb = self.client.get_storage_broker()
+        return len(sb.get_all_serverids())
 
     def data_connected_storage_servers(self, ctx, data):
-        ic = self.client.introducer_client
-        return len(ic.get_all_connections_for("storage"))
+        sb = self.client.get_storage_broker()
+        return len(sb.get_all_servers())
 
     def data_services(self, ctx, data):
-        ic = self.client.introducer_client
-        c = [ (service_name, nodeid, rsc)
-              for (nodeid, service_name), rsc
-              in ic.get_all_connectors().items() ]
-        c.sort()
-        return c
-
-    def render_service_row(self, ctx, data):
-        (service_name, nodeid, rsc) = data
+        sb = self.client.get_storage_broker()
+        return sb.get_all_descriptors()
+
+    def render_service_row(self, ctx, descriptor):
+        nodeid = descriptor.get_serverid()
+
         ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid))
-        ctx.fillSlots("nickname", rsc.nickname)
-        if rsc.rref:
-            rhost = rsc.remote_host
+        ctx.fillSlots("nickname", descriptor.get_nickname())
+        rhost = descriptor.get_remote_host()
+        if rhost:
             if nodeid == self.client.nodeid:
                 rhost_s = "(loopback)"
             elif isinstance(rhost, address.IPv4Address):
@@ -269,19 +263,24 @@ class Root(rend.Page):
             else:
                 rhost_s = str(rhost)
             connected = "Yes: to " + rhost_s
-            since = rsc.last_connect_time
+            since = descriptor.get_last_connect_time()
         else:
             connected = "No"
-            since = rsc.last_loss_time
+            since = descriptor.get_last_loss_time()
+        announced = descriptor.get_announcement_time()
+        announcement = descriptor.get_announcement()
+        version = announcement["version"]
+        service_name = announcement["service-name"]
 
         TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
         ctx.fillSlots("connected", connected)
-        ctx.fillSlots("connected-bool", not not rsc.rref)
-        ctx.fillSlots("since", time.strftime(TIME_FORMAT, time.localtime(since)))
+        ctx.fillSlots("connected-bool", bool(rhost))
+        ctx.fillSlots("since", time.strftime(TIME_FORMAT,
+                                             time.localtime(since)))
         ctx.fillSlots("announced", time.strftime(TIME_FORMAT,
-                                                 time.localtime(rsc.announcement_time)))
-        ctx.fillSlots("version", rsc.version)
-        ctx.fillSlots("service_name", rsc.service_name)
+                                                 time.localtime(announced)))
+        ctx.fillSlots("version", version)
+        ctx.fillSlots("service_name", service_name)
 
         return ctx.tag