]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/introducer/client.py
remove introducer's set_encoding_parameters
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / client.py
index bd45f6e818ce1a3b9a5db0a413d6a5e3319ba6f3..be44006740cda2ef70cd13989898562906cde3c0 100644 (file)
 
-import re, time, sha
-from base64 import b32decode
+import time
 from zope.interface import implements
 from twisted.application import service
-from foolscap.api import Referenceable
+from foolscap.api import Referenceable, eventually, RemoteInterface
 from allmydata.interfaces import InsufficientVersionError
-from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
-     IIntroducerClient
-from allmydata.util import log, idlib
-from allmydata.util.rrefutil import get_versioned_remote_reference
-from allmydata.introducer.common import make_index
-
-
-class RemoteServiceConnector:
-    """I hold information about a peer service that we want to connect to. If
-    we are connected, I hold the RemoteReference, the peer's address, and the
-    peer's version information. I remember information about when we were
-    last connected to the peer too, even if we aren't currently connected.
-
-    @ivar announcement_time: when we first heard about this service
-    @ivar last_connect_time: when we last established a connection
-    @ivar last_loss_time: when we last lost a connection
-
-    @ivar version: the peer's version, from the most recent announcement
-    @ivar oldest_supported: the peer's oldest supported version, same
-    @ivar nickname: the peer's self-reported nickname, same
-
-    @ivar rref: the RemoteReference, if connected, otherwise None
-    @ivar remote_host: the IAddress, if connected, otherwise None
-    """
-
-    VERSION_DEFAULTS = {
-        "storage": { "http://allmydata.org/tahoe/protocols/storage/v1" :
-                     { "maximum-immutable-share-size": 2**32,
-                       "tolerates-immutable-read-overrun": False,
-                       "delete-mutable-shares-with-zero-length-writev": False,
-                       },
-                     "application-version": "unknown: no get_version()",
-                     },
-        "stub_client": { },
-        }
-
-    def __init__(self, announcement, tub, ic):
-        self._tub = tub
-        self._announcement = announcement
-        self._ic = ic
-        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
-
-        self._furl = furl
-        m = re.match(r'pb://(\w+)@', furl)
-        assert m
-        self._nodeid = b32decode(m.group(1).upper())
-        self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
-
-        self.service_name = service_name
-
-        self.log("attempting to connect to %s" % self._nodeid_s)
-        self.announcement_time = time.time()
-        self.last_loss_time = None
-        self.rref = None
-        self.remote_host = None
-        self.last_connect_time = None
-        self.version = ver
-        self.oldest_supported = oldest
-        self.nickname = nickname
-
-    def log(self, *args, **kwargs):
-        return self._ic.log(*args, **kwargs)
-
-    def startConnecting(self):
-        self._reconnector = self._tub.connectTo(self._furl, self._got_service)
-
-    def stopConnecting(self):
-        self._reconnector.stopConnecting()
-
-    def _got_service(self, rref):
-        self.log("got connection to %s, getting versions" % self._nodeid_s)
-
-        default = self.VERSION_DEFAULTS.get(self.service_name, {})
-        d = get_versioned_remote_reference(rref, default)
-        d.addCallback(self._got_versioned_service)
-
-    def _got_versioned_service(self, rref):
-        self.log("connected to %s, version %s" % (self._nodeid_s, rref.version))
-
-        self.last_connect_time = time.time()
-        self.remote_host = rref.rref.tracker.broker.transport.getPeer()
-
-        self.rref = rref
-
-        self._ic.add_connection(self._nodeid, self.service_name, rref)
-
-        rref.notifyOnDisconnect(self._lost, rref)
-
-    def _lost(self, rref):
-        self.log("lost connection to %s" % self._nodeid_s)
-        self.last_loss_time = time.time()
-        self.rref = None
-        self.remote_host = None
-        self._ic.remove_connection(self._nodeid, self.service_name, rref)
-
-
-    def reset(self):
-        self._reconnector.reset()
+from allmydata.introducer.interfaces import IIntroducerClient, \
+     RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
+from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
+     convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
+     make_index, get_tubid_string_from_ann, get_tubid_string
+from allmydata.util import log
+from allmydata.util.rrefutil import add_version_to_remote_reference
+from allmydata.util.keyutil import BadSignatureError
+
+class WrapV2ClientInV1Interface(Referenceable): # for_v1
+    """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
+    can be attached to an old server."""
+    implements(RIIntroducerSubscriberClient_v1)
+
+    def __init__(self, original):
+        self.original = original
 
+    def remote_announce(self, announcements):
+        lp = self.original.log("received %d announcements (v1)" %
+                               len(announcements))
+        anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
+                       for ann_v1 in announcements])
+        return self.original.got_announcements(anns_v1, lp)
+
+class RIStubClient(RemoteInterface): # for_v1
+    """Each client publishes a service announcement for a dummy object called
+    the StubClient. This object doesn't actually offer any services, but the
+    announcement helps the Introducer keep track of which clients are
+    subscribed (so the grid admin can keep track of things like the size of
+    the grid and the client versions in use. This is the (empty)
+    RemoteInterface for the StubClient."""
+
+class StubClient(Referenceable): # for_v1
+    implements(RIStubClient)
+
+V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
+V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
 
 class IntroducerClient(service.Service, Referenceable):
-    implements(RIIntroducerSubscriberClient, IIntroducerClient)
+    implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
 
     def __init__(self, tub, introducer_furl,
-                 nickname, my_version, oldest_supported):
+                 nickname, my_version, oldest_supported,
+                 app_versions, sequencer):
         self._tub = tub
         self.introducer_furl = introducer_furl
 
-        self._nickname = nickname.encode("utf-8")
+        assert type(nickname) is unicode
+        self._nickname = nickname
         self._my_version = my_version
         self._oldest_supported = oldest_supported
-
-        self._published_announcements = set()
+        self._app_versions = app_versions
+        self._sequencer = sequencer
+
+        self._my_subscriber_info = { "version": 0,
+                                     "nickname": self._nickname,
+                                     "app-versions": self._app_versions,
+                                     "my-version": self._my_version,
+                                     "oldest-supported": self._oldest_supported,
+                                     }
+        self._stub_client = None # for_v1
+        self._stub_client_furl = None
+
+        self._outbound_announcements = {} # not signed
+        self._published_announcements = {} # signed
+        self._canary = Referenceable()
 
         self._publisher = None
-        self._connected = False
 
+        self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
         self._subscribed_service_names = set()
         self._subscriptions = set() # requests we've actually sent
-        self._received_announcements = set()
-        # TODO: this set will grow without bound, until the node is restarted
-
-        # we only accept one announcement per (peerid+service_name) pair.
-        # This insures that an upgraded host replace their previous
-        # announcement. It also means that each peer must have their own Tub
-        # (no sharing), which is slightly weird but consistent with the rest
-        # of the Tahoe codebase.
-        self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
-        # self._connections is a set of (peerid, service_name, rref) tuples
-        self._connections = set()
 
-        self.counter = 0 # incremented each time we change state, for tests
-        self.encoding_parameters = None
+        # _inbound_announcements remembers one announcement per
+        # (servicename,serverid) pair. Anything that arrives with the same
+        # pair will displace the previous one. This stores tuples of
+        # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
+        # dicts can be compared for equality to distinguish re-announcement
+        # from updates. It also provides memory for clients who subscribe
+        # after startup.
+        self._inbound_announcements = {}
+
+        # hooks for unit tests
+        self._debug_counts = {
+            "inbound_message": 0,
+            "inbound_announcement": 0,
+            "wrong_service": 0,
+            "duplicate_announcement": 0,
+            "update": 0,
+            "new_announcement": 0,
+            "outbound_message": 0,
+            }
+        self._debug_outstanding = 0
+
+    def _debug_retired(self, res):
+        self._debug_outstanding -= 1
+        return res
 
     def startService(self):
         service.Service.startService(self)
@@ -156,7 +119,7 @@ class IntroducerClient(service.Service, Referenceable):
                     { },
                     "application-version": "unknown: no get_version()",
                     }
-        d = get_versioned_remote_reference(publisher, default)
+        d = add_version_to_remote_reference(publisher, default)
         d.addCallback(self._got_versioned_introducer)
         d.addErrback(self._got_error)
 
@@ -166,11 +129,9 @@ class IntroducerClient(service.Service, Referenceable):
 
     def _got_versioned_introducer(self, publisher):
         self.log("got introducer version: %s" % (publisher.version,))
-        # we require a V1 introducer
-        needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
-        if needed not in publisher.version:
-            raise InsufficientVersionError(needed, publisher.version)
-        self._connected = True
+        # we require an introducer that speaks at least one of (V1, V2)
+        if not (V1 in publisher.version or V2 in publisher.version):
+            raise InsufficientVersionError("V1 or V2", publisher.version)
         self._publisher = publisher
         publisher.notifyOnDisconnect(self._disconnected)
         self._maybe_publish()
@@ -178,31 +139,22 @@ class IntroducerClient(service.Service, Referenceable):
 
     def _disconnected(self):
         self.log("bummer, we've lost our connection to the introducer")
-        self._connected = False
         self._publisher = None
         self._subscriptions.clear()
 
-    def stopService(self):
-        service.Service.stopService(self)
-        self._introducer_reconnector.stopConnecting()
-        for rsc in self._connectors.itervalues():
-            rsc.stopConnecting()
-
     def log(self, *args, **kwargs):
         if "facility" not in kwargs:
-            kwargs["facility"] = "tahoe.introducer"
+            kwargs["facility"] = "tahoe.introducer.client"
         return log.msg(*args, **kwargs)
 
-
-    def publish(self, furl, service_name, remoteinterface_name):
-        ann = (furl, service_name, remoteinterface_name,
-               self._nickname, self._my_version, self._oldest_supported)
-        self._published_announcements.add(ann)
-        self._maybe_publish()
-
-    def subscribe_to(self, service_name):
+    def subscribe_to(self, service_name, cb, *args, **kwargs):
+        self._local_subscribers.append( (service_name,cb,args,kwargs) )
         self._subscribed_service_names.add(service_name)
         self._maybe_subscribe()
+        for index,(ann,key_s,when) in self._inbound_announcements.items():
+            servicename = index[0]
+            if servicename == service_name:
+                eventually(cb, key_s, ann, *args, **kwargs)
 
     def _maybe_subscribe(self):
         if not self._publisher:
@@ -210,114 +162,192 @@ class IntroducerClient(service.Service, Referenceable):
                      level=log.NOISY)
             return
         for service_name in self._subscribed_service_names:
-            if service_name not in self._subscriptions:
-                # there is a race here, but the subscription desk ignores
-                # duplicate requests.
-                self._subscriptions.add(service_name)
-                d = self._publisher.callRemote("subscribe", self, service_name)
-                d.addErrback(log.err, facility="tahoe.introducer",
-                             level=log.WEIRD, umid="2uMScQ")
+            if service_name in self._subscriptions:
+                continue
+            self._subscriptions.add(service_name)
+            if V2 in self._publisher.version:
+                self._debug_outstanding += 1
+                d = self._publisher.callRemote("subscribe_v2",
+                                               self, service_name,
+                                               self._my_subscriber_info)
+                d.addBoth(self._debug_retired)
+            else:
+                d = self._subscribe_handle_v1(service_name) # for_v1
+            d.addErrback(log.err, facility="tahoe.introducer.client",
+                         level=log.WEIRD, umid="2uMScQ")
+
+    def _subscribe_handle_v1(self, service_name): # for_v1
+        # they don't speak V2: must be a v1 introducer. Fall back to the v1
+        # 'subscribe' method, using a client adapter.
+        ca = WrapV2ClientInV1Interface(self)
+        self._debug_outstanding += 1
+        d = self._publisher.callRemote("subscribe", ca, service_name)
+        d.addBoth(self._debug_retired)
+        # We must also publish an empty 'stub_client' object, so the
+        # introducer can count how many clients are connected and see what
+        # versions they're running.
+        if not self._stub_client_furl:
+            self._stub_client = sc = StubClient()
+            self._stub_client_furl = self._tub.registerReference(sc)
+        def _publish_stub_client(ignored):
+            furl = self._stub_client_furl
+            self.publish("stub_client",
+                         { "anonymous-storage-FURL": furl,
+                           "permutation-seed-base32": get_tubid_string(furl),
+                           })
+        d.addCallback(_publish_stub_client)
+        return d
+
+    def create_announcement_dict(self, service_name, ann):
+        ann_d = { "version": 0,
+                  # "seqnum" and "nonce" will be populated with new values in
+                  # publish(), each time we make a change
+                  "nickname": self._nickname,
+                  "app-versions": self._app_versions,
+                  "my-version": self._my_version,
+                  "oldest-supported": self._oldest_supported,
+
+                  "service-name": service_name,
+                  }
+        ann_d.update(ann)
+        return ann_d
+
+    def publish(self, service_name, ann, signing_key=None):
+        # we increment the seqnum every time we publish something new
+        current_seqnum, current_nonce = self._sequencer()
+
+        ann_d = self.create_announcement_dict(service_name, ann)
+        self._outbound_announcements[service_name] = ann_d
+
+        # publish all announcements with the new seqnum and nonce
+        for service_name,ann_d in self._outbound_announcements.items():
+            ann_d["seqnum"] = current_seqnum
+            ann_d["nonce"] = current_nonce
+            ann_t = sign_to_foolscap(ann_d, signing_key)
+            self._published_announcements[service_name] = ann_t
+        self._maybe_publish()
 
     def _maybe_publish(self):
         if not self._publisher:
             self.log("want to publish, but no introducer yet", level=log.NOISY)
             return
         # this re-publishes everything. The Introducer ignores duplicates
-        for ann in self._published_announcements:
-            d = self._publisher.callRemote("publish", ann)
-            d.addErrback(log.err, facility="tahoe.introducer",
+        for ann_t in self._published_announcements.values():
+            self._debug_counts["outbound_message"] += 1
+            if V2 in self._publisher.version:
+                self._debug_outstanding += 1
+                d = self._publisher.callRemote("publish_v2", ann_t,
+                                               self._canary)
+                d.addBoth(self._debug_retired)
+            else:
+                d = self._handle_v1_publisher(ann_t) # for_v1
+            d.addErrback(log.err, ann_t=ann_t,
+                         facility="tahoe.introducer.client",
                          level=log.WEIRD, umid="xs9pVQ")
 
+    def _handle_v1_publisher(self, ann_t): # for_v1
+        # they don't speak V2, so fall back to the old 'publish' method
+        # (which takes an unsigned tuple of bytestrings)
+        self.log("falling back to publish_v1",
+                 level=log.UNUSUAL, umid="9RCT1A")
+        ann_v1 = convert_announcement_v2_to_v1(ann_t)
+        self._debug_outstanding += 1
+        d = self._publisher.callRemote("publish", ann_v1)
+        d.addBoth(self._debug_retired)
+        return d
+
+
+    def remote_announce_v2(self, announcements):
+        lp = self.log("received %d announcements (v2)" % len(announcements))
+        return self.got_announcements(announcements, lp)
+
+    def got_announcements(self, announcements, lp=None):
+        # this is the common entry point for both v1 and v2 announcements
+        self._debug_counts["inbound_message"] += 1
+        for ann_t in announcements:
+            try:
+                # this might raise UnknownKeyError or bad-sig error
+                ann, key_s = unsign_from_foolscap(ann_t)
+                # key is "v0-base32abc123"
+            except BadSignatureError:
+                self.log("bad signature on inbound announcement: %s" % (ann_t,),
+                         parent=lp, level=log.WEIRD, umid="ZAU15Q")
+                # process other announcements that arrived with the bad one
+                continue
 
+            self._process_announcement(ann, key_s)
 
-    def remote_announce(self, announcements):
-        for ann in announcements:
-            self.log("received %d announcements" % len(announcements))
-            (furl, service_name, ri_name, nickname, ver, oldest) = ann
-            if service_name not in self._subscribed_service_names:
-                self.log("announcement for a service we don't care about [%s]"
-                         % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
-                continue
-            if ann in self._received_announcements:
-                self.log("ignoring old announcement: %s" % (ann,),
-                         level=log.NOISY)
-                continue
-            self.log("new announcement[%s]: %s" % (service_name, ann))
-            self._received_announcements.add(ann)
-            self._new_announcement(ann)
-
-    def _new_announcement(self, announcement):
-        # this will only be called for new announcements
-        index = make_index(announcement)
-        if index in self._connectors:
-            self.log("replacing earlier announcement", level=log.NOISY)
-            self._connectors[index].stopConnecting()
-        rsc = RemoteServiceConnector(announcement, self._tub, self)
-        self._connectors[index] = rsc
-        rsc.startConnecting()
-
-    def add_connection(self, nodeid, service_name, rref):
-        self._connections.add( (nodeid, service_name, rref) )
-        self.counter += 1
-        # when one connection is established, reset the timers on all others,
-        # to trigger a reconnection attempt in one second. This is intended
-        # to accelerate server connections when we've been offline for a
-        # while. The goal is to avoid hanging out for a long time with
-        # connections to only a subset of the servers, which would increase
-        # the chances that we'll put shares in weird places (and not update
-        # existing shares of mutable files). See #374 for more details.
-        for rsc in self._connectors.values():
-            rsc.reset()
-
-    def remove_connection(self, nodeid, service_name, rref):
-        self._connections.discard( (nodeid, service_name, rref) )
-        self.counter += 1
-
-
-    def get_all_connections(self):
-        return frozenset(self._connections)
-
-    def get_all_connectors(self):
-        return self._connectors.copy()
-
-    def get_all_peerids(self):
-        return frozenset([peerid
-                          for (peerid, service_name, rref)
-                          in self._connections])
-
-    def get_nickname_for_peerid(self, peerid):
-        for k in self._connectors:
-            (peerid0, svcname0) = k
-            if peerid0 == peerid:
-                rsc = self._connectors[k]
-                return rsc.nickname
-        return None
-
-    def get_all_connections_for(self, service_name):
-        return frozenset([c
-                          for c in self._connections
-                          if c[1] == service_name])
-
-    def get_peers(self, service_name):
-        """Return a set of (peerid, versioned-rref) tuples."""
-        return frozenset([(peerid, r) for (peerid, servname, r) in self._connections if servname == service_name])
-
-    def get_permuted_peers(self, service_name, key):
-        """Return an ordered list of (peerid, versioned-rref) tuples."""
-
-        servers = self.get_peers(service_name)
-
-        return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
-
-    def remote_set_encoding_parameters(self, parameters):
-        self.encoding_parameters = parameters
+    def _process_announcement(self, ann, key_s):
+        self._debug_counts["inbound_announcement"] += 1
+        service_name = str(ann["service-name"])
+        if service_name not in self._subscribed_service_names:
+            self.log("announcement for a service we don't care about [%s]"
+                     % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
+            self._debug_counts["wrong_service"] += 1
+            return
+        # for ASCII values, simplejson might give us unicode *or* bytes
+        if "nickname" in ann and isinstance(ann["nickname"], str):
+            ann["nickname"] = unicode(ann["nickname"])
+        nick_s = ann.get("nickname",u"").encode("utf-8")
+        lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
+                       nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
+
+        # how do we describe this node in the logs?
+        desc_bits = []
+        if key_s:
+            desc_bits.append("serverid=" + key_s[:20])
+        if "anonymous-storage-FURL" in ann:
+            tubid_s = get_tubid_string_from_ann(ann)
+            desc_bits.append("tubid=" + tubid_s[:8])
+        description = "/".join(desc_bits)
+
+        # the index is used to track duplicates
+        index = make_index(ann, key_s)
+
+        # is this announcement a duplicate?
+        if (index in self._inbound_announcements
+            and self._inbound_announcements[index][0] == ann):
+            self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
+                     service=service_name, description=description,
+                     parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
+            self._debug_counts["duplicate_announcement"] += 1
+            return
+
+        # does it update an existing one?
+        if index in self._inbound_announcements:
+            old,_,_ = self._inbound_announcements[index]
+            if "seqnum" in old:
+                # must beat previous sequence number to replace
+                if ("seqnum" not in ann
+                    or not isinstance(ann["seqnum"], (int,long))):
+                    self.log("not replacing old announcement, no valid seqnum: %s"
+                             % (ann,),
+                             parent=lp2, level=log.NOISY, umid="zFGH3Q")
+                    return
+                if ann["seqnum"] <= old["seqnum"]:
+                    # note that exact replays are caught earlier, by
+                    # comparing the entire signed announcement.
+                    self.log("not replacing old announcement, "
+                             "new seqnum is too old (%s <= %s) "
+                             "(replay attack?): %s"
+                             % (ann["seqnum"], old["seqnum"], ann),
+                             parent=lp2, level=log.UNUSUAL, umid="JAAAoQ")
+                    return
+                # ok, seqnum is newer, allow replacement
+            self._debug_counts["update"] += 1
+            self.log("replacing old announcement: %s" % (ann,),
+                     parent=lp2, level=log.NOISY, umid="wxwgIQ")
+        else:
+            self._debug_counts["new_announcement"] += 1
+            self.log("new announcement[%s]" % service_name,
+                     parent=lp2, level=log.NOISY)
+
+        self._inbound_announcements[index] = (ann, key_s, time.time())
+        # note: we never forget an index, but we might update its value
+
+        for (service_name2,cb,args,kwargs) in self._local_subscribers:
+            if service_name2 == service_name:
+                eventually(cb, key_s, ann, *args, **kwargs)
 
     def connected_to_introducer(self):
-        return self._connected
-
-    def debug_disconnect_from_peerid(self, victim_nodeid):
-        # for unit tests: locate and sever all connections to the given
-        # peerid.
-        for (nodeid, service_name, rref) in self._connections:
-            if nodeid == victim_nodeid:
-                rref.tracker.broker.transport.loseConnection()
+        return bool(self._publisher)