]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
new introducer: signed extensible dictionary-based messages! refs #466
authorBrian Warner <warner@lothar.com>
Sun, 20 Nov 2011 10:21:32 +0000 (02:21 -0800)
committerBrian Warner <warner@lothar.com>
Wed, 14 Mar 2012 01:24:32 +0000 (18:24 -0700)
This introduces new client and server halves to the Introducer (renaming the
old one with a _V1 suffix). Both have fallbacks to accomodate talking to a
different version: the publishing client switches on whether the server's
.get_version() advertises V2 support, the server switches on which
subscription method was invoked by the subscribing client.

The V2 protocol sends a three-tuple of (serialized announcement dictionary,
signature, pubkey) for each announcement. The V2 server dispatches messages
to subscribers according to the service-name, and throws errors for invalid
signatures, but does not otherwise examine the messages. The V2 receiver's
subscription callback will receive a (serverid, ann_dict) pair. The
'serverid' will be equal to the pubkey if all of the following are true:

  the originating client is V2, and was told a privkey to use
  the announcement went through a V2 server
  the signature is valid

If not, 'serverid' will be equal to the tubid portion of the announced FURL,
as was the case for V1 receivers.

Servers will create a keypair if one does not exist yet, stored in
private/server.privkey .

The signed announcement dictionary puts the server FURL in a key named
"anonymous-storage-FURL", which anticipates upcoming Accounting-related
changes in the server advertisements. It also provides a key named
"permutation-seed-base32" to tell clients what permutation seed to use. This
is computed at startup, using tubid if there are existing shares, otherwise
the pubkey, to retain share-order compatibility for existing servers.

17 files changed:
src/allmydata/client.py
src/allmydata/interfaces.py
src/allmydata/introducer/client.py
src/allmydata/introducer/common.py [new file with mode: 0644]
src/allmydata/introducer/interfaces.py
src/allmydata/introducer/old.py [new file with mode: 0644]
src/allmydata/introducer/server.py
src/allmydata/node.py
src/allmydata/storage/server.py
src/allmydata/storage_client.py
src/allmydata/test/test_checker.py
src/allmydata/test/test_client.py
src/allmydata/test/test_introducer.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_system.py
src/allmydata/test/test_upload.py
src/allmydata/web/introweb.py

index b78fd7b9641ff3eb87f97ad6eb96b82e2169e902..d42b6acf1f7a8c1dca4e8bfbd3551707e1fa2894 100644 (file)
@@ -1,12 +1,10 @@
 import os, stat, time, weakref
-from allmydata.interfaces import RIStorageServer
 from allmydata import node
 
 from zope.interface import implements
 from twisted.internet import reactor, defer
 from twisted.application import service
 from twisted.application.internet import TimerService
-from foolscap.api import Referenceable
 from pycryptopp.publickey import rsa
 
 import allmydata
@@ -16,14 +14,13 @@ from allmydata.immutable.upload import Uploader
 from allmydata.immutable.offloaded import Helper
 from allmydata.control import ControlServer
 from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, pollmixin, log
+from allmydata.util import hashutil, base32, pollmixin, log, keyutil
 from allmydata.util.encodingutil import get_filesystem_encoding
 from allmydata.util.abbreviate import parse_abbreviated_size
 from allmydata.util.time_format import parse_duration, parse_date
 from allmydata.stats import StatsProvider
 from allmydata.history import History
-from allmydata.interfaces import IStatsProducer, RIStubClient, \
-                                 SDMF_VERSION, MDMF_VERSION
+from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
 from allmydata.nodemaker import NodeMaker
 from allmydata.blacklist import Blacklist
 from allmydata.node import OldConfigOptionError
@@ -35,9 +32,6 @@ GiB=1024*MiB
 TiB=1024*GiB
 PiB=1024*TiB
 
-class StubClient(Referenceable):
-    implements(RIStubClient)
-
 def _make_secret():
     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
 
@@ -174,7 +168,8 @@ class Client(node.Node, pollmixin.PollMixin):
         ic = IntroducerClient(self.tub, self.introducer_furl,
                               self.nickname,
                               str(allmydata.__full_version__),
-                              str(self.OLDEST_SUPPORTED_VERSION))
+                              str(self.OLDEST_SUPPORTED_VERSION),
+                              self.get_app_versions())
         self.introducer_client = ic
         # hold off on starting the IntroducerClient until our tub has been
         # started, so we'll have a useful address on our RemoteReference, so
@@ -203,12 +198,46 @@ class Client(node.Node, pollmixin.PollMixin):
         self.convergence = base32.a2b(convergence_s)
         self._secret_holder = SecretHolder(lease_secret, self.convergence)
 
+    def _maybe_create_server_key(self):
+        # we only create the key once. On all subsequent runs, we re-use the
+        # existing key
+        def _make_key():
+            sk_vs,vk_vs = keyutil.make_keypair()
+            return sk_vs+"\n"
+        sk_vs = self.get_or_create_private_config("server.privkey", _make_key)
+        sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
+        self.write_config("server.pubkey", vk_vs+"\n")
+        self._server_key = sk
+
+    def _init_permutation_seed(self, ss):
+        seed = self.get_config_from_file("permutation-seed")
+        if not seed:
+            have_shares = ss.have_shares()
+            if have_shares:
+                # if the server has shares but not a recorded
+                # permutation-seed, then it has been around since pre-#466
+                # days, and the clients who uploaded those shares used our
+                # TubID as a permutation-seed. We should keep using that same
+                # seed to keep the shares in the same place in the permuted
+                # ring, so those clients don't have to perform excessive
+                # searches.
+                seed = base32.b2a(self.nodeid)
+            else:
+                # otherwise, we're free to use the more natural seed of our
+                # pubkey-based serverid
+                vk_bytes = self._server_key.get_verifying_key_bytes()
+                seed = base32.b2a(vk_bytes)
+            self.write_config("permutation-seed", seed+"\n")
+        return seed.strip()
+
     def init_storage(self):
         # should we run a storage server (and publish it for others to use)?
         if not self.get_config("storage", "enabled", True, boolean=True):
             return
         readonly = self.get_config("storage", "readonly", False, boolean=True)
 
+        self._maybe_create_server_key()
+
         storedir = os.path.join(self.basedir, self.STOREDIR)
 
         data = self.get_config("storage", "reserved_space", None)
@@ -262,8 +291,10 @@ class Client(node.Node, pollmixin.PollMixin):
         def _publish(res):
             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
             furl = self.tub.registerReference(ss, furlFile=furl_file)
-            ri_name = RIStorageServer.__remote_name__
-            self.introducer_client.publish(furl, "storage", ri_name)
+            ann = {"anonymous-storage-FURL": furl,
+                   "permutation-seed-base32": self._init_permutation_seed(ss),
+                   }
+            self.introducer_client.publish("storage", ann, self._server_key)
         d.addCallback(_publish)
         d.addErrback(log.err, facility="tahoe.init",
                      level=log.BAD, umid="aLGBKw")
@@ -281,7 +312,6 @@ class Client(node.Node, pollmixin.PollMixin):
         self.terminator.setServiceParent(self)
         self.add_service(Uploader(helper_furl, self.stats_provider,
                                   self.history))
-        self.init_stub_client()
         self.init_blacklist()
         self.init_nodemaker()
 
@@ -321,20 +351,6 @@ class Client(node.Node, pollmixin.PollMixin):
     def get_storage_broker(self):
         return self.storage_broker
 
-    def init_stub_client(self):
-        def _publish(res):
-            # we publish an empty object so that the introducer can count how
-            # many clients are connected and see what versions they're
-            # running.
-            sc = StubClient()
-            furl = self.tub.registerReference(sc)
-            ri_name = RIStubClient.__remote_name__
-            self.introducer_client.publish(furl, "stub_client", ri_name)
-        d = self.when_tub_ready()
-        d.addCallback(_publish)
-        d.addErrback(log.err, facility="tahoe.init",
-                     level=log.BAD, umid="OEHq3g")
-
     def init_blacklist(self):
         fn = os.path.join(self.basedir, "access.blacklist")
         self.blacklist = Blacklist(fn)
index a8b118025f4b88cb94ac60e801f06ee8cd130c12..163e49da200abba7e17efd65ef9075dc40f4c6f3 100644 (file)
@@ -30,14 +30,6 @@ WriteEnablerSecret = Hash # used to protect mutable bucket modifications
 LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
 LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
 
-class RIStubClient(RemoteInterface):
-    """Each client publishes a service announcement for a dummy object called
-    the StubClient. This object doesn't actually offer any services, but the
-    announcement helps the Introducer keep track of which clients are
-    subscribed (so the grid admin can keep track of things like the size of
-    the grid and the client versions in use. This is the (empty)
-    RemoteInterface for the StubClient."""
-
 class RIBucketWriter(RemoteInterface):
     """ Objects of this kind live on the server side. """
     def write(offset=Offset, data=ShareData):
index 31fbb5c2aeb075e1494ae97aeb77655ffd99b112..260e13cf1558a125bbbb56d2d4092f24fa6f846f 100644 (file)
@@ -1,29 +1,76 @@
 
-from base64 import b32decode
+import time
 from zope.interface import implements
 from twisted.application import service
-from foolscap.api import Referenceable, SturdyRef, eventually
+from foolscap.api import Referenceable, eventually, RemoteInterface
 from allmydata.interfaces import InsufficientVersionError
-from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
-     IIntroducerClient
-from allmydata.util import log, idlib
-from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
+from allmydata.introducer.interfaces import IIntroducerClient, \
+     RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
+from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
+     convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
+     make_index, get_tubid_string_from_ann, get_tubid_string
+from allmydata.util import log
+from allmydata.util.rrefutil import add_version_to_remote_reference
+from allmydata.util.keyutil import BadSignatureError
+
+class WrapV2ClientInV1Interface(Referenceable): # for_v1
+    """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
+    can be attached to an old server."""
+    implements(RIIntroducerSubscriberClient_v1)
+
+    def __init__(self, original):
+        self.original = original
 
+    def remote_announce(self, announcements):
+        lp = self.original.log("received %d announcements (v1)" %
+                               len(announcements))
+        anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
+                       for ann_v1 in announcements])
+        return self.original.got_announcements(anns_v1, lp)
+
+    def remote_set_encoding_parameters(self, parameters):
+        self.original.remote_set_encoding_parameters(parameters)
+
+class RIStubClient(RemoteInterface): # for_v1
+    """Each client publishes a service announcement for a dummy object called
+    the StubClient. This object doesn't actually offer any services, but the
+    announcement helps the Introducer keep track of which clients are
+    subscribed (so the grid admin can keep track of things like the size of
+    the grid and the client versions in use. This is the (empty)
+    RemoteInterface for the StubClient."""
+
+class StubClient(Referenceable): # for_v1
+    implements(RIStubClient)
+
+V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
+V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
 
 class IntroducerClient(service.Service, Referenceable):
-    implements(RIIntroducerSubscriberClient, IIntroducerClient)
+    implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
 
     def __init__(self, tub, introducer_furl,
-                 nickname, my_version, oldest_supported):
+                 nickname, my_version, oldest_supported,
+                 app_versions):
         self._tub = tub
         self.introducer_furl = introducer_furl
 
         assert type(nickname) is unicode
-        self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
+        self._nickname = nickname
         self._my_version = my_version
         self._oldest_supported = oldest_supported
+        self._app_versions = app_versions
 
-        self._published_announcements = set()
+        self._my_subscriber_info = { "version": 0,
+                                     "nickname": self._nickname,
+                                     "app-versions": self._app_versions,
+                                     "my-version": self._my_version,
+                                     "oldest-supported": self._oldest_supported,
+                                     }
+        self._stub_client = None # for_v1
+        self._stub_client_furl = None
+
+        self._published_announcements = {}
+        self._canary = Referenceable()
 
         self._publisher = None
 
@@ -33,10 +80,11 @@ class IntroducerClient(service.Service, Referenceable):
 
         # _current_announcements remembers one announcement per
         # (servicename,serverid) pair. Anything that arrives with the same
-        # pair will displace the previous one. This stores unpacked
-        # announcement dictionaries, which can be compared for equality to
-        # distinguish re-announcement from updates. It also provides memory
-        # for clients who subscribe after startup.
+        # pair will displace the previous one. This stores tuples of
+        # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
+        # dicts can be compared for equality to distinguish re-announcement
+        # from updates. It also provides memory for clients who subscribe
+        # after startup.
         self._current_announcements = {}
 
         self.encoding_parameters = None
@@ -51,6 +99,11 @@ class IntroducerClient(service.Service, Referenceable):
             "new_announcement": 0,
             "outbound_message": 0,
             }
+        self._debug_outstanding = 0
+
+    def _debug_retired(self, res):
+        self._debug_outstanding -= 1
+        return res
 
     def startService(self):
         service.Service.startService(self)
@@ -79,10 +132,9 @@ class IntroducerClient(service.Service, Referenceable):
 
     def _got_versioned_introducer(self, publisher):
         self.log("got introducer version: %s" % (publisher.version,))
-        # we require a V1 introducer
-        needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
-        if needed not in publisher.version:
-            raise InsufficientVersionError(needed, publisher.version)
+        # we require an introducer that speaks at least one of (V1, V2)
+        if not (V1 in publisher.version or V2 in publisher.version):
+            raise InsufficientVersionError("V1 or V2", publisher.version)
         self._publisher = publisher
         publisher.notifyOnDisconnect(self._disconnected)
         self._maybe_publish()
@@ -95,24 +147,17 @@ class IntroducerClient(service.Service, Referenceable):
 
     def log(self, *args, **kwargs):
         if "facility" not in kwargs:
-            kwargs["facility"] = "tahoe.introducer"
+            kwargs["facility"] = "tahoe.introducer.client"
         return log.msg(*args, **kwargs)
 
-
-    def publish(self, furl, service_name, remoteinterface_name):
-        assert type(self._nickname_utf8) is str # we always send UTF-8
-        ann = (furl, service_name, remoteinterface_name,
-               self._nickname_utf8, self._my_version, self._oldest_supported)
-        self._published_announcements.add(ann)
-        self._maybe_publish()
-
     def subscribe_to(self, service_name, cb, *args, **kwargs):
         self._local_subscribers.append( (service_name,cb,args,kwargs) )
         self._subscribed_service_names.add(service_name)
         self._maybe_subscribe()
-        for (servicename,nodeid),ann_d in self._current_announcements.items():
+        for index,(ann,key_s,when) in self._current_announcements.items():
+            servicename = index[0]
             if servicename == service_name:
-                eventually(cb, nodeid, ann_d)
+                eventually(cb, key_s, ann, *args, **kwargs)
 
     def _maybe_subscribe(self):
         if not self._publisher:
@@ -120,96 +165,160 @@ class IntroducerClient(service.Service, Referenceable):
                      level=log.NOISY)
             return
         for service_name in self._subscribed_service_names:
-            if service_name not in self._subscriptions:
-                # there is a race here, but the subscription desk ignores
-                # duplicate requests.
-                self._subscriptions.add(service_name)
-                d = self._publisher.callRemote("subscribe", self, service_name)
-                d.addErrback(trap_deadref)
-                d.addErrback(log.err, format="server errored during subscribe",
-                             facility="tahoe.introducer",
-                             level=log.WEIRD, umid="2uMScQ")
+            if service_name in self._subscriptions:
+                continue
+            self._subscriptions.add(service_name)
+            if V2 in self._publisher.version:
+                self._debug_outstanding += 1
+                d = self._publisher.callRemote("subscribe_v2",
+                                               self, service_name,
+                                               self._my_subscriber_info)
+                d.addBoth(self._debug_retired)
+            else:
+                d = self._subscribe_handle_v1(service_name) # for_v1
+            d.addErrback(log.err, facility="tahoe.introducer.client",
+                         level=log.WEIRD, umid="2uMScQ")
+
+    def _subscribe_handle_v1(self, service_name): # for_v1
+        # they don't speak V2: must be a v1 introducer. Fall back to the v1
+        # 'subscribe' method, using a client adapter.
+        ca = WrapV2ClientInV1Interface(self)
+        self._debug_outstanding += 1
+        d = self._publisher.callRemote("subscribe", ca, service_name)
+        d.addBoth(self._debug_retired)
+        # We must also publish an empty 'stub_client' object, so the
+        # introducer can count how many clients are connected and see what
+        # versions they're running.
+        if not self._stub_client_furl:
+            self._stub_client = sc = StubClient()
+            self._stub_client_furl = self._tub.registerReference(sc)
+        def _publish_stub_client(ignored):
+            furl = self._stub_client_furl
+            self.publish("stub_client",
+                         { "anonymous-storage-FURL": furl,
+                           "permutation-seed-base32": get_tubid_string(furl),
+                           })
+        d.addCallback(_publish_stub_client)
+        return d
+
+    def create_announcement(self, service_name, ann, signing_key):
+        full_ann = { "version": 0,
+                     "nickname": self._nickname,
+                     "app-versions": self._app_versions,
+                     "my-version": self._my_version,
+                     "oldest-supported": self._oldest_supported,
+
+                     "service-name": service_name,
+                     }
+        full_ann.update(ann)
+        return sign_to_foolscap(full_ann, signing_key)
+
+    def publish(self, service_name, ann, signing_key=None):
+        ann_t = self.create_announcement(service_name, ann, signing_key)
+        self._published_announcements[service_name] = ann_t
+        self._maybe_publish()
 
     def _maybe_publish(self):
         if not self._publisher:
             self.log("want to publish, but no introducer yet", level=log.NOISY)
             return
         # this re-publishes everything. The Introducer ignores duplicates
-        for ann in self._published_announcements:
+        for ann_t in self._published_announcements.values():
             self._debug_counts["outbound_message"] += 1
-            d = self._publisher.callRemote("publish", ann)
-            d.addErrback(trap_deadref)
-            d.addErrback(log.err,
-                         format="server errored during publish %(ann)s",
-                         ann=ann, facility="tahoe.introducer",
+            if V2 in self._publisher.version:
+                self._debug_outstanding += 1
+                d = self._publisher.callRemote("publish_v2", ann_t,
+                                               self._canary)
+                d.addBoth(self._debug_retired)
+            else:
+                d = self._handle_v1_publisher(ann_t) # for_v1
+            d.addErrback(log.err, ann_t=ann_t,
+                         facility="tahoe.introducer.client",
                          level=log.WEIRD, umid="xs9pVQ")
 
+    def _handle_v1_publisher(self, ann_t): # for_v1
+        # they don't speak V2, so fall back to the old 'publish' method
+        # (which takes an unsigned tuple of bytestrings)
+        self.log("falling back to publish_v1",
+                 level=log.UNUSUAL, umid="9RCT1A")
+        ann_v1 = convert_announcement_v2_to_v1(ann_t)
+        self._debug_outstanding += 1
+        d = self._publisher.callRemote("publish", ann_v1)
+        d.addBoth(self._debug_retired)
+        return d
 
 
-    def remote_announce(self, announcements):
-        self.log("received %d announcements" % len(announcements))
+    def remote_announce_v2(self, announcements):
+        lp = self.log("received %d announcements (v2)" % len(announcements))
+        return self.got_announcements(announcements, lp)
+
+    def got_announcements(self, announcements, lp=None):
+        # this is the common entry point for both v1 and v2 announcements
         self._debug_counts["inbound_message"] += 1
-        for ann in announcements:
+        for ann_t in announcements:
             try:
-                self._process_announcement(ann)
-            except:
-                log.err(format="unable to process announcement %(ann)s",
-                        ann=ann)
-                # Don't let a corrupt announcement prevent us from processing
-                # the remaining ones. Don't return an error to the server,
-                # since they'd just ignore it anyways.
-                pass
-
-    def _process_announcement(self, ann):
+                # this might raise UnknownKeyError or bad-sig error
+                ann, key_s = unsign_from_foolscap(ann_t)
+                # key is "v0-base32abc123"
+            except BadSignatureError:
+                self.log("bad signature on inbound announcement: %s" % (ann_t,),
+                         parent=lp, level=log.WEIRD, umid="ZAU15Q")
+                # process other announcements that arrived with the bad one
+                continue
+
+            self._process_announcement(ann, key_s)
+
+    def _process_announcement(self, ann, key_s):
         self._debug_counts["inbound_announcement"] += 1
-        (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
+        service_name = str(ann["service-name"])
         if service_name not in self._subscribed_service_names:
             self.log("announcement for a service we don't care about [%s]"
                      % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
             self._debug_counts["wrong_service"] += 1
             return
-        self.log("announcement for [%s]: %s" % (service_name, ann),
-                 umid="BoKEag")
-        assert type(furl) is str
-        assert type(service_name) is str
-        assert type(ri_name) is str
-        assert type(nickname_utf8) is str
-        nickname = nickname_utf8.decode("utf-8")
-        assert type(nickname) is unicode
-        assert type(ver) is str
-        assert type(oldest) is str
-
-        nodeid = b32decode(SturdyRef(furl).tubID.upper())
-        nodeid_s = idlib.shortnodeid_b2a(nodeid)
-
-        ann_d = { "version": 0,
-                  "service-name": service_name,
-
-                  "FURL": furl,
-                  "nickname": nickname,
-                  "app-versions": {}, # need #466 and v2 introducer
-                  "my-version": ver,
-                  "oldest-supported": oldest,
-                  }
-
-        index = (service_name, nodeid)
-        if self._current_announcements.get(index, None) == ann_d:
-            self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
-                     service=service_name, nodeid=nodeid_s,
-                     level=log.UNUSUAL, umid="B1MIdA")
+        # for ASCII values, simplejson might give us unicode *or* bytes
+        if "nickname" in ann and isinstance(ann["nickname"], str):
+            ann["nickname"] = unicode(ann["nickname"])
+        nick_s = ann.get("nickname",u"").encode("utf-8")
+        lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
+                       nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
+
+        # how do we describe this node in the logs?
+        desc_bits = []
+        if key_s:
+            desc_bits.append("serverid=" + key_s[:20])
+        if "anonymous-storage-FURL" in ann:
+            tubid_s = get_tubid_string_from_ann(ann)
+            desc_bits.append("tubid=" + tubid_s[:8])
+        description = "/".join(desc_bits)
+
+        # the index is used to track duplicates
+        index = make_index(ann, key_s)
+
+        # is this announcement a duplicate?
+        if (index in self._current_announcements
+            and self._current_announcements[index][0] == ann):
+            self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
+                     service=service_name, description=description,
+                     parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
             self._debug_counts["duplicate_announcement"] += 1
             return
+        # does it update an existing one?
         if index in self._current_announcements:
             self._debug_counts["update"] += 1
+            self.log("replacing old announcement: %s" % (ann,),
+                     parent=lp2, level=log.NOISY, umid="wxwgIQ")
         else:
             self._debug_counts["new_announcement"] += 1
+            self.log("new announcement[%s]" % service_name,
+                     parent=lp2, level=log.NOISY)
 
-        self._current_announcements[index] = ann_d
+        self._current_announcements[index] = (ann, key_s, time.time())
         # note: we never forget an index, but we might update its value
 
         for (service_name2,cb,args,kwargs) in self._local_subscribers:
             if service_name2 == service_name:
-                eventually(cb, nodeid, ann_d, *args, **kwargs)
+                eventually(cb, key_s, ann, *args, **kwargs)
 
     def remote_set_encoding_parameters(self, parameters):
         self.encoding_parameters = parameters
diff --git a/src/allmydata/introducer/common.py b/src/allmydata/introducer/common.py
new file mode 100644 (file)
index 0000000..2f6e9c8
--- /dev/null
@@ -0,0 +1,92 @@
+
+import re, simplejson
+from allmydata.util import keyutil, base32
+
+def make_index(ann, key_s):
+    """Return something that can be used as an index (e.g. a tuple of
+    strings), such that two messages that refer to the same 'thing' will have
+    the same index. This is a tuple of (service-name, signing-key, None) for
+    signed announcements, or (service-name, None, tubid) for unsigned
+    announcements."""
+
+    service_name = str(ann["service-name"])
+    if key_s:
+        return (service_name, key_s, None)
+    else:
+        tubid = get_tubid_string_from_ann(ann)
+        return (service_name, None, tubid)
+
+def get_tubid_string_from_ann(ann):
+    return get_tubid_string(str(ann.get("anonymous-storage-FURL")
+                                or ann.get("FURL")))
+
+def get_tubid_string(furl):
+    m = re.match(r'pb://(\w+)@', furl)
+    assert m
+    return m.group(1).lower()
+
+def convert_announcement_v1_to_v2(ann_t):
+    (furl, service_name, ri_name, nickname, ver, oldest) = ann_t
+    assert type(furl) is str
+    assert type(service_name) is str
+    # ignore ri_name
+    assert type(nickname) is str
+    assert type(ver) is str
+    assert type(oldest) is str
+    ann = {"version": 0,
+           "nickname": nickname.decode("utf-8"),
+           "app-versions": {},
+           "my-version": ver,
+           "oldest-supported": oldest,
+
+           "service-name": service_name,
+           "anonymous-storage-FURL": furl,
+           "permutation-seed-base32": get_tubid_string(furl),
+           }
+    msg = simplejson.dumps(ann).encode("utf-8")
+    return (msg, None, None)
+
+def convert_announcement_v2_to_v1(ann_v2):
+    (msg, sig, pubkey) = ann_v2
+    ann = simplejson.loads(msg)
+    assert ann["version"] == 0
+    ann_t = (str(ann["anonymous-storage-FURL"]),
+             str(ann["service-name"]),
+             "remoteinterface-name is unused",
+             ann["nickname"].encode("utf-8"),
+             str(ann["my-version"]),
+             str(ann["oldest-supported"]),
+             )
+    return ann_t
+
+
+def sign_to_foolscap(ann, sk):
+    # return (bytes, None, None) or (bytes, sig-str, pubkey-str). A future
+    # HTTP-based serialization will use JSON({msg:b64(JSON(msg).utf8),
+    # sig:v0-b64(sig), pubkey:v0-b64(pubkey)}) .
+    msg = simplejson.dumps(ann).encode("utf-8")
+    if sk:
+        sig = "v0-"+base32.b2a(sk.sign(msg))
+        vk_bytes = sk.get_verifying_key_bytes()
+        ann_t = (msg, sig, "v0-"+base32.b2a(vk_bytes))
+    else:
+        ann_t = (msg, None, None)
+    return ann_t
+
+class UnknownKeyError(Exception):
+    pass
+
+def unsign_from_foolscap(ann_t):
+    (msg, sig_vs, claimed_key_vs) = ann_t
+    key_vs = None
+    if sig_vs and claimed_key_vs:
+        if not sig_vs.startswith("v0-"):
+            raise UnknownKeyError("only v0- signatures recognized")
+        if not claimed_key_vs.startswith("v0-"):
+            raise UnknownKeyError("only v0- keys recognized")
+        claimed_key = keyutil.parse_pubkey("pub-"+claimed_key_vs)
+        sig_bytes = base32.a2b(keyutil.remove_prefix(sig_vs, "v0-"))
+        claimed_key.verify(sig_bytes, msg)
+        key_vs = claimed_key_vs
+    ann = simplejson.loads(msg.decode("utf-8"))
+    return (ann, key_vs)
index 54f1701f881f85d4be85bafef92e82c9ac4a9fd8..53d875ac942ff2bd9622b7d410bba6230cea357e 100644 (file)
@@ -1,9 +1,12 @@
 
 from zope.interface import Interface
 from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
-    RemoteInterface
+    RemoteInterface, Referenceable
+from old import RIIntroducerSubscriberClient_v1
 FURL = StringConstraint(1000)
 
+# old introducer protocol (v1):
+#
 # Announcements are (FURL, service_name, remoteinterface_name,
 #                    nickname, my_version, oldest_supported)
 #  the (FURL, service_name, remoteinterface_name) refer to the service being
@@ -14,13 +17,17 @@ FURL = StringConstraint(1000)
 #  incompatible peer. The second goal is to enable the development of
 #  backwards-compatibility code.
 
-Announcement = TupleOf(FURL, str, str,
-                       str, str, str)
+Announcement_v1 = TupleOf(FURL, str, str,
+                          str, str, str)
 
-class RIIntroducerSubscriberClient(RemoteInterface):
-    __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
+# v2 protocol over foolscap: Announcements are 3-tuples of (bytes, str, str)
+# or (bytes, none, none)
+Announcement_v2 = Any()
 
-    def announce(announcements=SetOf(Announcement)):
+class RIIntroducerSubscriberClient_v2(RemoteInterface):
+    __remote_name__ = "RIIntroducerSubscriberClient_v2.tahoe.allmydata.com"
+
+    def announce_v2(announcements=SetOf(Announcement_v2)):
         """I accept announcements from the publisher."""
         return None
 
@@ -41,38 +48,29 @@ class RIIntroducerSubscriberClient(RemoteInterface):
         """
         return None
 
-# When Foolscap can handle multiple interfaces (Foolscap#17), the
-# full-powered introducer will implement both RIIntroducerPublisher and
-# RIIntroducerSubscriberService. Until then, we define
-# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
-# make everybody use that.
+SubscriberInfo = DictOf(str, Any())
 
-class RIIntroducerPublisher(RemoteInterface):
+class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface):
     """To publish a service to the world, connect to me and give me your
-    announcement message. I will deliver a copy to all connected subscribers."""
-    __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
-
-    def publish(announcement=Announcement):
-        # canary?
-        return None
-
-class RIIntroducerSubscriberService(RemoteInterface):
-    __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
-
-    def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
-        """Give me a subscriber reference, and I will call its new_peers()
-        method will any announcements that match the desired service name. I
-        will ignore duplicate subscriptions.
-        """
-        return None
-
-class RIIntroducerPublisherAndSubscriberService(RemoteInterface):
-    __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
+    announcement message. I will deliver a copy to all connected subscribers.
+    To hear about services, connect to me and subscribe to a specific
+    service_name."""
+    __remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com"
     def get_version():
         return DictOf(str, Any())
-    def publish(announcement=Announcement):
+    def publish(announcement=Announcement_v1):
+        return None
+    def publish_v2(announcement=Announcement_v2, canary=Referenceable):
         return None
-    def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
+    def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
+        return None
+    def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2,
+                     service_name=str, subscriber_info=SubscriberInfo):
+        """Give me a subscriber reference, and I will call its announce_v2()
+        method with any announcements that match the desired service name. I
+        will ignore duplicate subscriptions. The subscriber_info dictionary
+        tells me about the subscriber, and is used for diagnostic/status
+        displays."""
         return None
 
 class IIntroducerClient(Interface):
@@ -80,41 +78,47 @@ class IIntroducerClient(Interface):
     publish their services to the rest of the world, and I help them learn
     about services available on other nodes."""
 
-    def publish(furl, service_name, remoteinterface_name):
-        """Once you call this, I will tell the world that the Referenceable
-        available at FURL is available to provide a service named
-        SERVICE_NAME. The precise definition of the service being provided is
-        identified by the Foolscap 'remote interface name' in the last
-        parameter: this is supposed to be a globally-unique string that
-        identifies the RemoteInterface that is implemented."""
+    def publish(service_name, ann, signing_key=None):
+        """Publish the given announcement dictionary (which must be
+        JSON-serializable), plus some additional keys, to the world.
+
+        Each announcement is characterized by a (service_name, serverid)
+        pair. When the server sees two announcements with the same pair, the
+        later one will replace the earlier one. The serverid is derived from
+        the signing_key, if present, otherwise it is derived from the
+        'anonymous-storage-FURL' key.
+
+        If signing_key= is set to an instance of SigningKey, it will be
+        used to sign the announcement."""
 
     def subscribe_to(service_name, callback, *args, **kwargs):
         """Call this if you will eventually want to use services with the
         given SERVICE_NAME. This will prompt me to subscribe to announcements
         of those services. Your callback will be invoked with at least two
-        arguments: a serverid (binary string), and an announcement
-        dictionary, followed by any additional callback args/kwargs you give
-        me. I will run your callback for both new announcements and for
+        arguments: a pubkey and an announcement dictionary, followed by any
+        additional callback args/kwargs you gave me. The pubkey will be None
+        unless the announcement was signed by the corresponding pubkey, in
+        which case it will be a printable string like 'v0-base32..'.
+
+        I will run your callback for both new announcements and for
         announcements that have changed, but you must be prepared to tolerate
         duplicates.
 
-        The announcement dictionary that I give you will have the following
-        keys:
+        The announcement that I give you comes from some other client. It
+        will be a JSON-serializable dictionary which (by convention) is
+        expected to have at least the following keys:
 
          version: 0
-         service-name: str('storage')
-
-         FURL: str(furl)
-         remoteinterface-name: str(ri_name)
          nickname: unicode
          app-versions: {}
          my-version: str
          oldest-supported: str
 
-        Note that app-version will be an empty dictionary until #466 is done
-        and both the introducer and the remote client have been upgraded. For
-        current (native) server types, the serverid will always be equal to
-        the binary form of the FURL's tubid.
+         service-name: str('storage')
+         anonymous-storage-FURL: str(furl)
+
+        Note that app-version will be an empty dictionary if either the
+        publishing client or the Introducer are running older code.
         """
 
     def connected_to_introducer():
diff --git a/src/allmydata/introducer/old.py b/src/allmydata/introducer/old.py
new file mode 100644 (file)
index 0000000..e0bdacf
--- /dev/null
@@ -0,0 +1,463 @@
+
+import time
+from base64 import b32decode
+from zope.interface import implements, Interface
+from twisted.application import service
+import allmydata
+from allmydata.interfaces import InsufficientVersionError
+from allmydata.util import log, idlib, rrefutil
+from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
+    RemoteInterface, Referenceable, eventually, SturdyRef
+FURL = StringConstraint(1000)
+
+# We keep a copy of the old introducer (both client and server) here to
+# support compatibility tests. The old client is supposed to handle the new
+# server, and new client is supposed to handle the old server.
+
+
+# Announcements are (FURL, service_name, remoteinterface_name,
+#                    nickname, my_version, oldest_supported)
+#  the (FURL, service_name, remoteinterface_name) refer to the service being
+#  announced. The (nickname, my_version, oldest_supported) refer to the
+#  client as a whole. The my_version/oldest_supported strings can be parsed
+#  by an allmydata.util.version.Version instance, and then compared. The
+#  first goal is to make sure that nodes are not confused by speaking to an
+#  incompatible peer. The second goal is to enable the development of
+#  backwards-compatibility code.
+
+Announcement = TupleOf(FURL, str, str,
+                       str, str, str)
+
+class RIIntroducerSubscriberClient_v1(RemoteInterface):
+    __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
+
+    def announce(announcements=SetOf(Announcement)):
+        """I accept announcements from the publisher."""
+        return None
+
+    def set_encoding_parameters(parameters=(int, int, int)):
+        """Advise the client of the recommended k-of-n encoding parameters
+        for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
+        is the total number of shares that will be created for any given
+        file, while 'k' is the number of shares that must be retrieved to
+        recover that file, and 'desired' is the minimum number of shares that
+        must be placed before the uploader will consider its job a success.
+        n/k is the expansion ratio, while k determines the robustness.
+
+        Introducers should specify 'n' according to the expected size of the
+        grid (there is no point to producing more shares than there are
+        peers), and k according to the desired reliability-vs-overhead goals.
+
+        Note that setting k=1 is equivalent to simple replication.
+        """
+        return None
+
+# When Foolscap can handle multiple interfaces (Foolscap#17), the
+# full-powered introducer will implement both RIIntroducerPublisher and
+# RIIntroducerSubscriberService. Until then, we define
+# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
+# make everybody use that.
+
+class RIIntroducerPublisher_v1(RemoteInterface):
+    """To publish a service to the world, connect to me and give me your
+    announcement message. I will deliver a copy to all connected subscribers."""
+    __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
+
+    def publish(announcement=Announcement):
+        # canary?
+        return None
+
+class RIIntroducerSubscriberService_v1(RemoteInterface):
+    __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
+
+    def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
+        """Give me a subscriber reference, and I will call its new_peers()
+        method will any announcements that match the desired service name. I
+        will ignore duplicate subscriptions.
+        """
+        return None
+
+class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
+    __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
+    def get_version():
+        return DictOf(str, Any())
+    def publish(announcement=Announcement):
+        return None
+    def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
+        return None
+
+class IIntroducerClient(Interface):
+    """I provide service introduction facilities for a node. I help nodes
+    publish their services to the rest of the world, and I help them learn
+    about services available on other nodes."""
+
+    def publish(furl, service_name, remoteinterface_name):
+        """Once you call this, I will tell the world that the Referenceable
+        available at FURL is available to provide a service named
+        SERVICE_NAME. The precise definition of the service being provided is
+        identified by the Foolscap 'remote interface name' in the last
+        parameter: this is supposed to be a globally-unique string that
+        identifies the RemoteInterface that is implemented."""
+
+    def subscribe_to(service_name, callback, *args, **kwargs):
+        """Call this if you will eventually want to use services with the
+        given SERVICE_NAME. This will prompt me to subscribe to announcements
+        of those services. Your callback will be invoked with at least two
+        arguments: a serverid (binary string), and an announcement
+        dictionary, followed by any additional callback args/kwargs you give
+        me. I will run your callback for both new announcements and for
+        announcements that have changed, but you must be prepared to tolerate
+        duplicates.
+
+        The announcement dictionary that I give you will have the following
+        keys:
+
+         version: 0
+         service-name: str('storage')
+
+         FURL: str(furl)
+         remoteinterface-name: str(ri_name)
+         nickname: unicode
+         app-versions: {}
+         my-version: str
+         oldest-supported: str
+
+        Note that app-version will be an empty dictionary until #466 is done
+        and both the introducer and the remote client have been upgraded. For
+        current (native) server types, the serverid will always be equal to
+        the binary form of the FURL's tubid.
+        """
+
+    def connected_to_introducer():
+        """Returns a boolean, True if we are currently connected to the
+        introducer, False if not."""
+
+
+class IntroducerClient_v1(service.Service, Referenceable):
+    implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
+
+    def __init__(self, tub, introducer_furl,
+                 nickname, my_version, oldest_supported):
+        self._tub = tub
+        self.introducer_furl = introducer_furl
+
+        assert type(nickname) is unicode
+        self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
+        self._my_version = my_version
+        self._oldest_supported = oldest_supported
+
+        self._published_announcements = set()
+
+        self._publisher = None
+
+        self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
+        self._subscribed_service_names = set()
+        self._subscriptions = set() # requests we've actually sent
+
+        # _current_announcements remembers one announcement per
+        # (servicename,serverid) pair. Anything that arrives with the same
+        # pair will displace the previous one. This stores unpacked
+        # announcement dictionaries, which can be compared for equality to
+        # distinguish re-announcement from updates. It also provides memory
+        # for clients who subscribe after startup.
+        self._current_announcements = {}
+
+        self.encoding_parameters = None
+
+        # hooks for unit tests
+        self._debug_counts = {
+            "inbound_message": 0,
+            "inbound_announcement": 0,
+            "wrong_service": 0,
+            "duplicate_announcement": 0,
+            "update": 0,
+            "new_announcement": 0,
+            "outbound_message": 0,
+            }
+        self._debug_outstanding = 0
+
+    def _debug_retired(self, res):
+        self._debug_outstanding -= 1
+        return res
+
+    def startService(self):
+        service.Service.startService(self)
+        self._introducer_error = None
+        rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
+        self._introducer_reconnector = rc
+        def connect_failed(failure):
+            self.log("Initial Introducer connection failed: perhaps it's down",
+                     level=log.WEIRD, failure=failure, umid="c5MqUQ")
+        d = self._tub.getReference(self.introducer_furl)
+        d.addErrback(connect_failed)
+
+    def _got_introducer(self, publisher):
+        self.log("connected to introducer, getting versions")
+        default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
+                    { },
+                    "application-version": "unknown: no get_version()",
+                    }
+        d = rrefutil.add_version_to_remote_reference(publisher, default)
+        d.addCallback(self._got_versioned_introducer)
+        d.addErrback(self._got_error)
+
+    def _got_error(self, f):
+        # TODO: for the introducer, perhaps this should halt the application
+        self._introducer_error = f # polled by tests
+
+    def _got_versioned_introducer(self, publisher):
+        self.log("got introducer version: %s" % (publisher.version,))
+        # we require a V1 introducer
+        needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
+        if needed not in publisher.version:
+            raise InsufficientVersionError(needed, publisher.version)
+        self._publisher = publisher
+        publisher.notifyOnDisconnect(self._disconnected)
+        self._maybe_publish()
+        self._maybe_subscribe()
+
+    def _disconnected(self):
+        self.log("bummer, we've lost our connection to the introducer")
+        self._publisher = None
+        self._subscriptions.clear()
+
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.introducer"
+        return log.msg(*args, **kwargs)
+
+
+    def publish(self, furl, service_name, remoteinterface_name):
+        assert type(self._nickname_utf8) is str # we always send UTF-8
+        ann = (furl, service_name, remoteinterface_name,
+               self._nickname_utf8, self._my_version, self._oldest_supported)
+        self._published_announcements.add(ann)
+        self._maybe_publish()
+
+    def subscribe_to(self, service_name, cb, *args, **kwargs):
+        self._local_subscribers.append( (service_name,cb,args,kwargs) )
+        self._subscribed_service_names.add(service_name)
+        self._maybe_subscribe()
+        for (servicename,nodeid),ann_d in self._current_announcements.items():
+            if servicename == service_name:
+                eventually(cb, nodeid, ann_d)
+
+    def _maybe_subscribe(self):
+        if not self._publisher:
+            self.log("want to subscribe, but no introducer yet",
+                     level=log.NOISY)
+            return
+        for service_name in self._subscribed_service_names:
+            if service_name not in self._subscriptions:
+                # there is a race here, but the subscription desk ignores
+                # duplicate requests.
+                self._subscriptions.add(service_name)
+                self._debug_outstanding += 1
+                d = self._publisher.callRemote("subscribe", self, service_name)
+                d.addBoth(self._debug_retired)
+                d.addErrback(rrefutil.trap_deadref)
+                d.addErrback(log.err, format="server errored during subscribe",
+                             facility="tahoe.introducer",
+                             level=log.WEIRD, umid="2uMScQ")
+
+    def _maybe_publish(self):
+        if not self._publisher:
+            self.log("want to publish, but no introducer yet", level=log.NOISY)
+            return
+        # this re-publishes everything. The Introducer ignores duplicates
+        for ann in self._published_announcements:
+            self._debug_counts["outbound_message"] += 1
+            self._debug_outstanding += 1
+            d = self._publisher.callRemote("publish", ann)
+            d.addBoth(self._debug_retired)
+            d.addErrback(rrefutil.trap_deadref)
+            d.addErrback(log.err,
+                         format="server errored during publish %(ann)s",
+                         ann=ann, facility="tahoe.introducer",
+                         level=log.WEIRD, umid="xs9pVQ")
+
+
+
+    def remote_announce(self, announcements):
+        self.log("received %d announcements" % len(announcements))
+        self._debug_counts["inbound_message"] += 1
+        for ann in announcements:
+            try:
+                self._process_announcement(ann)
+            except:
+                log.err(format="unable to process announcement %(ann)s",
+                        ann=ann)
+                # Don't let a corrupt announcement prevent us from processing
+                # the remaining ones. Don't return an error to the server,
+                # since they'd just ignore it anyways.
+                pass
+
+    def _process_announcement(self, ann):
+        self._debug_counts["inbound_announcement"] += 1
+        (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
+        if service_name not in self._subscribed_service_names:
+            self.log("announcement for a service we don't care about [%s]"
+                     % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
+            self._debug_counts["wrong_service"] += 1
+            return
+        self.log("announcement for [%s]: %s" % (service_name, ann),
+                 umid="BoKEag")
+        assert type(furl) is str
+        assert type(service_name) is str
+        assert type(ri_name) is str
+        assert type(nickname_utf8) is str
+        nickname = nickname_utf8.decode("utf-8")
+        assert type(nickname) is unicode
+        assert type(ver) is str
+        assert type(oldest) is str
+
+        nodeid = b32decode(SturdyRef(furl).tubID.upper())
+        nodeid_s = idlib.shortnodeid_b2a(nodeid)
+
+        ann_d = { "version": 0,
+                  "service-name": service_name,
+
+                  "FURL": furl,
+                  "nickname": nickname,
+                  "app-versions": {}, # need #466 and v2 introducer
+                  "my-version": ver,
+                  "oldest-supported": oldest,
+                  }
+
+        index = (service_name, nodeid)
+        if self._current_announcements.get(index, None) == ann_d:
+            self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
+                     service=service_name, nodeid=nodeid_s,
+                     level=log.UNUSUAL, umid="B1MIdA")
+            self._debug_counts["duplicate_announcement"] += 1
+            return
+        if index in self._current_announcements:
+            self._debug_counts["update"] += 1
+        else:
+            self._debug_counts["new_announcement"] += 1
+
+        self._current_announcements[index] = ann_d
+        # note: we never forget an index, but we might update its value
+
+        for (service_name2,cb,args,kwargs) in self._local_subscribers:
+            if service_name2 == service_name:
+                eventually(cb, nodeid, ann_d, *args, **kwargs)
+
+    def remote_set_encoding_parameters(self, parameters):
+        self.encoding_parameters = parameters
+
+    def connected_to_introducer(self):
+        return bool(self._publisher)
+
+class IntroducerService_v1(service.MultiService, Referenceable):
+    implements(RIIntroducerPublisherAndSubscriberService_v1)
+    name = "introducer"
+    VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
+                 { },
+                "application-version": str(allmydata.__full_version__),
+                }
+
+    def __init__(self, basedir="."):
+        service.MultiService.__init__(self)
+        self.introducer_url = None
+        # 'index' is (service_name, tubid)
+        self._announcements = {} # dict of index -> (announcement, timestamp)
+        self._subscribers = {} # dict of (rref->timestamp) dicts
+        self._debug_counts = {"inbound_message": 0,
+                              "inbound_duplicate": 0,
+                              "inbound_update": 0,
+                              "outbound_message": 0,
+                              "outbound_announcements": 0,
+                              "inbound_subscribe": 0}
+        self._debug_outstanding = 0
+
+    def _debug_retired(self, res):
+        self._debug_outstanding -= 1
+        return res
+
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.introducer"
+        return log.msg(*args, **kwargs)
+
+    def get_announcements(self):
+        return self._announcements
+    def get_subscribers(self):
+        return self._subscribers
+
+    def remote_get_version(self):
+        return self.VERSION
+
+    def remote_publish(self, announcement):
+        try:
+            self._publish(announcement)
+        except:
+            log.err(format="Introducer.remote_publish failed on %(ann)s",
+                    ann=announcement, level=log.UNUSUAL, umid="620rWA")
+            raise
+
+    def _publish(self, announcement):
+        self._debug_counts["inbound_message"] += 1
+        self.log("introducer: announcement published: %s" % (announcement,) )
+        (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
+        #print "PUB", service_name, nickname_utf8
+
+        nodeid = b32decode(SturdyRef(furl).tubID.upper())
+        index = (service_name, nodeid)
+
+        if index in self._announcements:
+            (old_announcement, timestamp) = self._announcements[index]
+            if old_announcement == announcement:
+                self.log("but we already knew it, ignoring", level=log.NOISY)
+                self._debug_counts["inbound_duplicate"] += 1
+                return
+            else:
+                self.log("old announcement being updated", level=log.NOISY)
+                self._debug_counts["inbound_update"] += 1
+        self._announcements[index] = (announcement, time.time())
+
+        for s in self._subscribers.get(service_name, []):
+            self._debug_counts["outbound_message"] += 1
+            self._debug_counts["outbound_announcements"] += 1
+            self._debug_outstanding += 1
+            d = s.callRemote("announce", set([announcement]))
+            d.addBoth(self._debug_retired)
+            d.addErrback(rrefutil.trap_deadref)
+            d.addErrback(log.err,
+                         format="subscriber errored on announcement %(ann)s",
+                         ann=announcement, facility="tahoe.introducer",
+                         level=log.UNUSUAL, umid="jfGMXQ")
+
+    def remote_subscribe(self, subscriber, service_name):
+        self.log("introducer: subscription[%s] request at %s" % (service_name,
+                                                                 subscriber))
+        self._debug_counts["inbound_subscribe"] += 1
+        if service_name not in self._subscribers:
+            self._subscribers[service_name] = {}
+        subscribers = self._subscribers[service_name]
+        if subscriber in subscribers:
+            self.log("but they're already subscribed, ignoring",
+                     level=log.UNUSUAL)
+            return
+        subscribers[subscriber] = time.time()
+        def _remove():
+            self.log("introducer: unsubscribing[%s] %s" % (service_name,
+                                                           subscriber))
+            subscribers.pop(subscriber, None)
+        subscriber.notifyOnDisconnect(_remove)
+
+        announcements = set(
+            [ ann
+              for (sn2,nodeid),(ann,when) in self._announcements.items()
+              if sn2 == service_name] )
+
+        self._debug_counts["outbound_message"] += 1
+        self._debug_counts["outbound_announcements"] += len(announcements)
+        self._debug_outstanding += 1
+        d = subscriber.callRemote("announce", announcements)
+        d.addBoth(self._debug_retired)
+        d.addErrback(rrefutil.trap_deadref)
+        d.addErrback(log.err,
+                     format="subscriber errored during subscribe %(anns)s",
+                     anns=announcements, facility="tahoe.introducer",
+                     level=log.UNUSUAL, umid="1XChxA")
index 10dc1a05fd1e418e8588eb2d74cb446506ffa457..41f92b23673333845c5ef5da59b681ec655b8132 100644 (file)
@@ -1,14 +1,16 @@
 
 import time, os.path
-from base64 import b32decode
 from zope.interface import implements
 from twisted.application import service
-from foolscap.api import Referenceable, SturdyRef
+from foolscap.api import Referenceable
 import allmydata
 from allmydata import node
-from allmydata.util import log, rrefutil
+from allmydata.util import log
 from allmydata.introducer.interfaces import \
-     RIIntroducerPublisherAndSubscriberService
+     RIIntroducerPublisherAndSubscriberService_v2
+from allmydata.introducer.common import convert_announcement_v1_to_v2, \
+     convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
+     get_tubid_string_from_ann
 
 class IntroducerNode(node.Node):
     PORTNUMFILE = "introducer.port"
@@ -31,14 +33,15 @@ class IntroducerNode(node.Node):
         def _publish(res):
             self.introducer_url = self.tub.registerReference(introducerservice,
                                                              "introducer")
-            self.log(" introducer is at %s" % self.introducer_url)
+            self.log(" introducer is at %s" % self.introducer_url,
+                     umid="qF2L9A")
             self.write_config("introducer.furl", self.introducer_url + "\n")
         d.addCallback(_publish)
         d.addErrback(log.err, facility="tahoe.init",
                      level=log.BAD, umid="UaNs9A")
 
     def init_web(self, webport):
-        self.log("init_web(webport=%s)", args=(webport,))
+        self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
 
         from allmydata.webish import IntroducerWebishServer
         nodeurl_path = os.path.join(self.basedir, "node.url")
@@ -47,105 +50,260 @@ class IntroducerNode(node.Node):
         ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
         self.add_service(ws)
 
+class WrapV1SubscriberInV2Interface: # for_v1
+    """I wrap a RemoteReference that points at an old v1 subscriber, enabling
+    it to be treated like a v2 subscriber.
+    """
+
+    def __init__(self, original):
+        self.original = original
+    def __eq__(self, them):
+        return self.original == them
+    def __ne__(self, them):
+        return self.original != them
+    def __hash__(self):
+        return hash(self.original)
+    def getRemoteTubID(self):
+        return self.original.getRemoteTubID()
+    def getSturdyRef(self):
+        return self.original.getSturdyRef()
+    def getPeer(self):
+        return self.original.getPeer()
+    def callRemote(self, methname, *args, **kwargs):
+        m = getattr(self, "wrap_" + methname)
+        return m(*args, **kwargs)
+    def wrap_announce_v2(self, announcements):
+        anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
+        return self.original.callRemote("announce", set(anns_v1))
+    def wrap_set_encoding_parameters(self, parameters):
+        # note: unused
+        return self.original.callRemote("set_encoding_parameters", parameters)
+    def notifyOnDisconnect(self, *args, **kwargs):
+        return self.original.notifyOnDisconnect(*args, **kwargs)
+
 class IntroducerService(service.MultiService, Referenceable):
-    implements(RIIntroducerPublisherAndSubscriberService)
+    implements(RIIntroducerPublisherAndSubscriberService_v2)
     name = "introducer"
-    VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
-                 { },
+    # v1 is the original protocol, supported since 1.0 (but only advertised
+    # starting in 1.3). v2 is the new signed protocol, supported after 1.9
+    VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
+                "http://allmydata.org/tahoe/protocols/introducer/v2": { },
                 "application-version": str(allmydata.__full_version__),
                 }
 
     def __init__(self, basedir="."):
         service.MultiService.__init__(self)
         self.introducer_url = None
-        # 'index' is (service_name, tubid)
-        self._announcements = {} # dict of index -> (announcement, timestamp)
-        self._subscribers = {} # dict of (rref->timestamp) dicts
+        # 'index' is (service_name, key_s, tubid), where key_s or tubid is
+        # None
+        self._announcements = {} # dict of index ->
+                                 # (ann_t, canary, ann, timestamp)
+
+        # ann (the announcement dictionary) is cleaned up: nickname is always
+        # unicode, servicename is always ascii, etc, even though
+        # simplejson.loads sometimes returns either
+
+        # self._subscribers is a dict mapping servicename to subscriptions
+        # 'subscriptions' is a dict mapping rref to a subscription
+        # 'subscription' is a tuple of (subscriber_info, timestamp)
+        # 'subscriber_info' is a dict, provided directly for v2 clients, or
+        # synthesized for v1 clients. The expected keys are:
+        #  version, nickname, app-versions, my-version, oldest-supported
+        self._subscribers = {}
+
+        # self._stub_client_announcements contains the information provided
+        # by v1 clients. We stash this so we can match it up with their
+        # subscriptions.
+        self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
+
         self._debug_counts = {"inbound_message": 0,
                               "inbound_duplicate": 0,
                               "inbound_update": 0,
                               "outbound_message": 0,
                               "outbound_announcements": 0,
                               "inbound_subscribe": 0}
+        self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
+
+    def _debug_retired(self, res):
+        self._debug_outstanding -= 1
+        return res
 
     def log(self, *args, **kwargs):
         if "facility" not in kwargs:
-            kwargs["facility"] = "tahoe.introducer"
+            kwargs["facility"] = "tahoe.introducer.server"
         return log.msg(*args, **kwargs)
 
     def get_announcements(self):
         return self._announcements
     def get_subscribers(self):
-        return self._subscribers
+        """Return a list of (service_name, when, subscriber_info, rref) for
+        all subscribers. subscriber_info is a dict with the following keys:
+        version, nickname, app-versions, my-version, oldest-supported"""
+        s = []
+        for service_name, subscriptions in self._subscribers.items():
+            for rref,(subscriber_info,when) in subscriptions.items():
+                s.append( (service_name, when, subscriber_info, rref) )
+        return s
 
     def remote_get_version(self):
         return self.VERSION
 
-    def remote_publish(self, announcement):
+    def remote_publish(self, ann_t): # for_v1
+        lp = self.log("introducer: old (v1) announcement published: %s"
+                      % (ann_t,), umid="6zGOIw")
+        ann_v2 = convert_announcement_v1_to_v2(ann_t)
+        return self.publish(ann_v2, None, lp)
+
+    def remote_publish_v2(self, ann_t, canary):
+        lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
+        return self.publish(ann_t, canary, lp)
+
+    def publish(self, ann_t, canary, lp):
         try:
-            self._publish(announcement)
+            self._publish(ann_t, canary, lp)
         except:
             log.err(format="Introducer.remote_publish failed on %(ann)s",
-                    ann=announcement, level=log.UNUSUAL, umid="620rWA")
+                    ann=ann_t,
+                    level=log.UNUSUAL, parent=lp, umid="620rWA")
             raise
 
-    def _publish(self, announcement):
+    def _publish(self, ann_t, canary, lp):
         self._debug_counts["inbound_message"] += 1
-        self.log("introducer: announcement published: %s" % (announcement,) )
-        (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
+        self.log("introducer: announcement published: %s" % (ann_t,),
+                 umid="wKHgCw")
+        ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
+        index = make_index(ann, key)
 
-        nodeid = b32decode(SturdyRef(furl).tubID.upper())
-        index = (service_name, nodeid)
+        service_name = str(ann["service-name"])
+        if service_name == "stub_client": # for_v1
+            self._attach_stub_client(ann, lp)
+            return
 
-        if index in self._announcements:
-            (old_announcement, timestamp) = self._announcements[index]
-            if old_announcement == announcement:
-                self.log("but we already knew it, ignoring", level=log.NOISY)
+        old = self._announcements.get(index)
+        if old:
+            (old_ann_t, canary, old_ann, timestamp) = old
+            if old_ann == ann:
+                self.log("but we already knew it, ignoring", level=log.NOISY,
+                         umid="myxzLw")
                 self._debug_counts["inbound_duplicate"] += 1
                 return
             else:
-                self.log("old announcement being updated", level=log.NOISY)
+                self.log("old announcement being updated", level=log.NOISY,
+                         umid="304r9g")
                 self._debug_counts["inbound_update"] += 1
-        self._announcements[index] = (announcement, time.time())
+        self._announcements[index] = (ann_t, canary, ann, time.time())
+        #if canary:
+        #    canary.notifyOnDisconnect ...
+        # use a CanaryWatcher? with cw.is_connected()?
+        # actually we just want foolscap to give rref.is_connected(), since
+        # this is only for the status display
 
         for s in self._subscribers.get(service_name, []):
             self._debug_counts["outbound_message"] += 1
             self._debug_counts["outbound_announcements"] += 1
-            d = s.callRemote("announce", set([announcement]))
-            d.addErrback(rrefutil.trap_deadref)
+            self._debug_outstanding += 1
+            d = s.callRemote("announce_v2", set([ann_t]))
+            d.addBoth(self._debug_retired)
             d.addErrback(log.err,
                          format="subscriber errored on announcement %(ann)s",
-                         ann=announcement, facility="tahoe.introducer",
+                         ann=ann_t, facility="tahoe.introducer",
                          level=log.UNUSUAL, umid="jfGMXQ")
 
-    def remote_subscribe(self, subscriber, service_name):
-        self.log("introducer: subscription[%s] request at %s" % (service_name,
-                                                                 subscriber))
+    def _attach_stub_client(self, ann, lp):
+        # There might be a v1 subscriber for whom this is a stub_client.
+        # We might have received the subscription before the stub_client
+        # announcement, in which case we now need to fix up the record in
+        # self._subscriptions .
+
+        # record it for later, in case the stub_client arrived before the
+        # subscription
+        subscriber_info = self._get_subscriber_info_from_ann(ann)
+        ann_tubid = get_tubid_string_from_ann(ann)
+        self._stub_client_announcements[ann_tubid] = subscriber_info
+
+        lp2 = self.log("stub_client announcement, "
+                       "looking for matching subscriber",
+                       parent=lp, level=log.NOISY, umid="BTywDg")
+
+        for sn in self._subscribers:
+            s = self._subscribers[sn]
+            for (subscriber, info) in s.items():
+                # we correlate these by looking for a subscriber whose tubid
+                # matches this announcement
+                sub_tubid = subscriber.getRemoteTubID()
+                if sub_tubid == ann_tubid:
+                    self.log(format="found a match, nodeid=%(nodeid)s",
+                             nodeid=sub_tubid,
+                             level=log.NOISY, parent=lp2, umid="xsWs1A")
+                    # found a match. Does it need info?
+                    if not info[0]:
+                        self.log(format="replacing info",
+                                 level=log.NOISY, parent=lp2, umid="m5kxwA")
+                        # yup
+                        s[subscriber] = (subscriber_info, info[1])
+            # and we don't remember or announce stub_clients beyond what we
+            # need to get the subscriber_info set up
+
+    def _get_subscriber_info_from_ann(self, ann): # for_v1
+        sinfo = { "version": ann["version"],
+                  "nickname": ann["nickname"],
+                  "app-versions": ann["app-versions"],
+                  "my-version": ann["my-version"],
+                  "oldest-supported": ann["oldest-supported"],
+                  }
+        return sinfo
+
+    def remote_subscribe(self, subscriber, service_name): # for_v1
+        self.log("introducer: old (v1) subscription[%s] request at %s"
+                 % (service_name, subscriber), umid="hJlGUg")
+        return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
+                                   service_name, None)
+
+    def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
+        self.log("introducer: subscription[%s] request at %s"
+                 % (service_name, subscriber), umid="U3uzLg")
+        return self.add_subscriber(subscriber, service_name, subscriber_info)
+
+    def add_subscriber(self, subscriber, service_name, subscriber_info):
         self._debug_counts["inbound_subscribe"] += 1
         if service_name not in self._subscribers:
             self._subscribers[service_name] = {}
         subscribers = self._subscribers[service_name]
         if subscriber in subscribers:
             self.log("but they're already subscribed, ignoring",
-                     level=log.UNUSUAL)
+                     level=log.UNUSUAL, umid="Sy9EfA")
             return
-        subscribers[subscriber] = time.time()
+
+        if not subscriber_info: # for_v1
+            # v1 clients don't provide subscriber_info, but they should
+            # publish a 'stub client' record which contains the same
+            # information. If we've already received this, it will be in
+            # self._stub_client_announcements
+            tubid = subscriber.getRemoteTubID()
+            if tubid in self._stub_client_announcements:
+                subscriber_info = self._stub_client_announcements[tubid]
+
+        subscribers[subscriber] = (subscriber_info, time.time())
         def _remove():
             self.log("introducer: unsubscribing[%s] %s" % (service_name,
-                                                           subscriber))
+                                                           subscriber),
+                     umid="vYGcJg")
             subscribers.pop(subscriber, None)
         subscriber.notifyOnDisconnect(_remove)
 
-        announcements = set(
-            [ ann
-              for (sn2,nodeid),(ann,when) in self._announcements.items()
-              if sn2 == service_name] )
-
-        self._debug_counts["outbound_message"] += 1
-        self._debug_counts["outbound_announcements"] += len(announcements)
-        d = subscriber.callRemote("announce", announcements)
-        d.addErrback(rrefutil.trap_deadref)
-        d.addErrback(log.err,
-                     format="subscriber errored during subscribe %(anns)s",
-                     anns=announcements, facility="tahoe.introducer",
-                     level=log.UNUSUAL, umid="mtZepQ")
+        # now tell them about any announcements they're interested in
+        announcements = set( [ ann_t
+                               for idx,(ann_t,canary,ann,when)
+                               in self._announcements.items()
+                               if idx[0] == service_name] )
+        if announcements:
+            self._debug_counts["outbound_message"] += 1
+            self._debug_counts["outbound_announcements"] += len(announcements)
+            self._debug_outstanding += 1
+            d = subscriber.callRemote("announce_v2", announcements)
+            d.addBoth(self._debug_retired)
+            d.addErrback(log.err,
+                         format="subscriber errored during subscribe %(anns)s",
+                         anns=announcements, facility="tahoe.introducer",
+                         level=log.UNUSUAL, umid="mtZepQ")
+            return d
index 0b8436502790e7f45ef59990dd792823e3ba2b6a..06b707de1c50d86804a208a97e8657ea87cb3572 100644 (file)
@@ -195,6 +195,19 @@ class Node(service.MultiService):
         # TODO: merge this with allmydata.get_package_versions
         return dict(app_versions.versions)
 
+    def get_config_from_file(self, name, required=False):
+        """Get the (string) contents of a config file, or None if the file
+        did not exist. If required=True, raise an exception rather than
+        returning None. Any leading or trailing whitespace will be stripped
+        from the data."""
+        fn = os.path.join(self.basedir, name)
+        try:
+            return fileutil.read(fn).strip()
+        except EnvironmentError:
+            if not required:
+                return None
+            raise
+
     def write_private_config(self, name, value):
         """Write the (string) contents of a private config file (which is a
         config file that resides within the subdirectory named 'private'), and
index 1f39c9ca14dd10c4010c942c2c661ef2321b0238..c5e5b3925b4f4a78b0a6e53083b6cec6f7380eb9 100644 (file)
@@ -99,6 +99,11 @@ class StorageServer(service.MultiService, Referenceable):
     def __repr__(self):
         return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
 
+    def have_shares(self):
+        # quick test to decide if we need to commit to an implicit
+        # permutation-seed or if we should use a new one
+        return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
+
     def add_bucket_counter(self):
         statefile = os.path.join(self.storedir, "bucket_counter.state")
         self.bucket_counter = BucketCountingCrawler(self, statefile)
index 7a8ce0c45ee1eee99f0cf02add11d7abe239a493..9d36a8bdf2aa9fe596c2f58206f9d1c422405074 100644 (file)
@@ -29,11 +29,11 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
 # 6: implement other sorts of IStorageClient classes: S3, etc
 
 
-import time
+import re, time
 from zope.interface import implements, Interface
 from foolscap.api import eventually
 from allmydata.interfaces import IStorageBroker
-from allmydata.util import idlib, log
+from allmydata.util import log, base32
 from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.util.hashutil import sha1
@@ -74,8 +74,8 @@ class StorageFarmBroker:
         self.introducer_client = None
 
     # these two are used in unit tests
-    def test_add_rref(self, serverid, rref):
-        s = NativeStorageServer(serverid, {})
+    def test_add_rref(self, serverid, rref, ann):
+        s = NativeStorageServer(serverid, ann.copy())
         s.rref = rref
         self.servers[serverid] = s
 
@@ -86,21 +86,23 @@ class StorageFarmBroker:
         self.introducer_client = ic = introducer_client
         ic.subscribe_to("storage", self._got_announcement)
 
-    def _got_announcement(self, serverid, ann_d):
-        precondition(isinstance(serverid, str), serverid)
-        precondition(len(serverid) == 20, serverid)
-        assert ann_d["service-name"] == "storage"
+    def _got_announcement(self, key_s, ann):
+        if key_s is not None:
+            precondition(isinstance(key_s, str), key_s)
+            precondition(key_s.startswith("v0-"), key_s)
+        assert ann["service-name"] == "storage"
+        s = NativeStorageServer(key_s, ann)
+        serverid = s.get_serverid()
         old = self.servers.get(serverid)
         if old:
-            if old.get_announcement() == ann_d:
+            if old.get_announcement() == ann:
                 return # duplicate
             # replacement
             del self.servers[serverid]
             old.stop_connecting()
             # now we forget about them and start using the new one
-        dsc = NativeStorageServer(serverid, ann_d)
-        self.servers[serverid] = dsc
-        dsc.start_connecting(self.tub, self._trigger_connections)
+        self.servers[serverid] = s
+        s.start_connecting(self.tub, self._trigger_connections)
         # the descriptor will manage their own Reconnector, and each time we
         # need servers, we'll ask them if they're connected or not.
 
@@ -173,13 +175,25 @@ class NativeStorageServer:
         "application-version": "unknown: no get_version()",
         }
 
-    def __init__(self, serverid, ann_d, min_shares=1):
-        self.serverid = serverid
-        self._tubid = serverid
-        self.announcement = ann_d
+    def __init__(self, key_s, ann, min_shares=1):
+        self.key_s = key_s
+        self.announcement = ann
         self.min_shares = min_shares
 
-        self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
+        assert "anonymous-storage-FURL" in ann, ann
+        furl = str(ann["anonymous-storage-FURL"])
+        m = re.match(r'pb://(\w+)@', furl)
+        assert m, furl
+        tubid_s = m.group(1).lower()
+        self._tubid = base32.a2b(tubid_s)
+        assert "permutation-seed-base32" in ann, ann
+        ps = base32.a2b(str(ann["permutation-seed-base32"]))
+        self._permutation_seed = ps
+
+        name = key_s or tubid_s
+        self._long_description = name
+        self._short_description = name[:8] # TODO: decide who adds []
+
         self.announcement_time = time.time()
         self.last_connect_time = None
         self.last_loss_time = None
@@ -191,17 +205,17 @@ class NativeStorageServer:
     def __repr__(self):
         return "<NativeStorageServer for %s>" % self.get_name()
     def get_serverid(self):
-        return self._tubid
+        return self._tubid # XXX replace with self.key_s
     def get_permutation_seed(self):
-        return self._tubid
+        return self._permutation_seed
     def get_version(self):
         if self.rref:
             return self.rref.version
         return None
     def get_name(self): # keep methodname short
-        return self.serverid_s
+        return self._short_description
     def get_longname(self):
-        return idlib.nodeid_b2a(self._tubid)
+        return self._long_description
     def get_lease_seed(self):
         return self._tubid
     def get_foolscap_write_enabler_seed(self):
@@ -221,7 +235,7 @@ class NativeStorageServer:
         return self.announcement_time
 
     def start_connecting(self, tub, trigger_cb):
-        furl = self.announcement["FURL"]
+        furl = str(self.announcement["anonymous-storage-FURL"])
         self._trigger_cb = trigger_cb
         self._reconnector = tub.connectTo(furl, self._got_connection)
 
index 6ca51139f09962888ab6f644d00a63aa781c4592..7ca19f8ad2a29e467e1d59ae708e7f679f7b1789 100644 (file)
@@ -22,15 +22,16 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
         for (peerid, nickname) in [("\x00"*20, "peer-0"),
                                    ("\xff"*20, "peer-f"),
                                    ("\x11"*20, "peer-11")] :
-            ann_d = { "version": 0,
-                      "service-name": "storage",
-                      "FURL": "fake furl",
-                      "nickname": unicode(nickname),
-                      "app-versions": {}, # need #466 and v2 introducer
-                      "my-version": "ver",
-                      "oldest-supported": "oldest",
-                      }
-            s = NativeStorageServer(peerid, ann_d)
+            ann = { "version": 0,
+                    "service-name": "storage",
+                    "anonymous-storage-FURL": "pb://abcde@nowhere/fake",
+                    "permutation-seed-base32": "",
+                    "nickname": unicode(nickname),
+                    "app-versions": {}, # need #466 and v2 introducer
+                    "my-version": "ver",
+                    "oldest-supported": "oldest",
+                    }
+            s = NativeStorageServer(peerid, ann)
             sb.test_add_server(peerid, s)
         c = FakeClient()
         c.storage_broker = sb
index 14c9c791a9341e03c0bb5938ff30ca370a3afc87..5373464811c864af79c689f188b76fe3b599470c 100644 (file)
@@ -136,12 +136,14 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
         self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
 
     def _permute(self, sb, key):
-        return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ]
+        return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
 
     def test_permute(self):
         sb = StorageFarmBroker(None, True)
         for k in ["%d" % i for i in range(5)]:
-            sb.test_add_rref(k, "rref")
+            ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
+                   "permutation-seed-base32": base32.b2a(k) }
+            sb.test_add_rref(k, "rref", ann)
 
         self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
         self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
index 9d0f50eb92fa8b3b3184cdf46457433473356697..b8dd50b364049c1f75dbd97f200bfda133871631 100644 (file)
@@ -1,6 +1,7 @@
 
 import os, re
 from base64 import b32decode
+import simplejson
 
 from twisted.trial import unittest
 from twisted.internet import defer
@@ -9,11 +10,16 @@ from twisted.python import log
 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
 from twisted.application import service
 from allmydata.interfaces import InsufficientVersionError
-from allmydata.introducer.client import IntroducerClient
+from allmydata.introducer.client import IntroducerClient, \
+     WrapV2ClientInV1Interface
 from allmydata.introducer.server import IntroducerService
+from allmydata.introducer.common import get_tubid_string_from_ann, \
+     get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
+     UnknownKeyError
+from allmydata.introducer import old
 # test compatibility with old introducer .tac files
 from allmydata.introducer import IntroducerNode
-from allmydata.util import pollmixin
+from allmydata.util import pollmixin, keyutil
 import allmydata.test.common_util as testutil
 
 class LoggingMultiService(service.MultiService):
@@ -47,14 +53,14 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
 
     def test_create(self):
         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
-                              "my_version", "oldest_version")
+                              "my_version", "oldest_version", {})
         self.failUnless(isinstance(ic, IntroducerClient))
 
     def test_listen(self):
         i = IntroducerService()
         i.setServiceParent(self.parent)
 
-    def test_duplicate(self):
+    def test_duplicate_publish(self):
         i = IntroducerService()
         self.failUnlessEqual(len(i.get_announcements()), 0)
         self.failUnlessEqual(len(i.get_subscribers()), 0)
@@ -73,6 +79,223 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
         self.failUnlessEqual(len(i.get_announcements()), 2)
         self.failUnlessEqual(len(i.get_subscribers()), 0)
 
+    def test_id_collision(self):
+        # test replacement case where tubid equals a keyid (one should
+        # not replace the other)
+        i = IntroducerService()
+        ic = IntroducerClient(None,
+                              "introducer.furl", u"my_nickname",
+                              "my_version", "oldest_version", {})
+        sk_s, vk_s = keyutil.make_keypair()
+        sk, _ignored = keyutil.parse_privkey(sk_s)
+        keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
+        furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
+        ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
+        i.remote_publish_v2(ann_t, Referenceable())
+        announcements = i.get_announcements()
+        self.failUnlessEqual(len(announcements), 1)
+        key1 = ("storage", "v0-"+keyid, None)
+        self.failUnless(key1 in announcements)
+        (ign, ign, ann1_out, ign) = announcements[key1]
+        self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
+
+        furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
+        ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
+        i.remote_publish(ann2)
+        self.failUnlessEqual(len(announcements), 2)
+        key2 = ("storage", None, keyid)
+        self.failUnless(key2 in announcements)
+        (ign, ign, ann2_out, ign) = announcements[key2]
+        self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
+
+
+def make_ann(furl):
+    ann = { "anonymous-storage-FURL": furl,
+            "permutation-seed-base32": get_tubid_string(furl) }
+    return ann
+
+def make_ann_t(ic, furl, privkey):
+    return ic.create_announcement("storage", make_ann(furl), privkey)
+
+class Client(unittest.TestCase):
+    def test_duplicate_receive_v1(self):
+        ic = IntroducerClient(None,
+                              "introducer.furl", u"my_nickname",
+                              "my_version", "oldest_version", {})
+        announcements = []
+        ic.subscribe_to("storage",
+                        lambda key_s,ann: announcements.append(ann))
+        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
+        ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
+        ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
+        ca = WrapV2ClientInV1Interface(ic)
+
+        ca.remote_announce([ann1])
+        d = fireEventually()
+        def _then(ign):
+            self.failUnlessEqual(len(announcements), 1)
+            self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
+            self.failUnlessEqual(announcements[0]["my-version"], "ver23")
+            self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
+            self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
+            self.failUnlessEqual(ic._debug_counts["update"], 0)
+            self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
+            # now send a duplicate announcement: this should not notify clients
+            ca.remote_announce([ann1])
+            return fireEventually()
+        d.addCallback(_then)
+        def _then2(ign):
+            self.failUnlessEqual(len(announcements), 1)
+            self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
+            self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
+            self.failUnlessEqual(ic._debug_counts["update"], 0)
+            self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
+            # and a replacement announcement: same FURL, new other stuff.
+            # Clients should be notified.
+            ca.remote_announce([ann1b])
+            return fireEventually()
+        d.addCallback(_then2)
+        def _then3(ign):
+            self.failUnlessEqual(len(announcements), 2)
+            self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
+            self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
+            self.failUnlessEqual(ic._debug_counts["update"], 1)
+            self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
+            # test that the other stuff changed
+            self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
+            self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
+        d.addCallback(_then3)
+        return d
+
+    def test_duplicate_receive_v2(self):
+        ic1 = IntroducerClient(None,
+                               "introducer.furl", u"my_nickname",
+                               "ver23", "oldest_version", {})
+        # we use a second client just to create a different-looking
+        # announcement
+        ic2 = IntroducerClient(None,
+                               "introducer.furl", u"my_nickname",
+                               "ver24","oldest_version",{})
+        announcements = []
+        def _received(key_s, ann):
+            announcements.append( (key_s, ann) )
+        ic1.subscribe_to("storage", _received)
+        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
+        furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
+        furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
+
+        privkey_s, pubkey_vs = keyutil.make_keypair()
+        privkey, _ignored = keyutil.parse_privkey(privkey_s)
+        pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
+
+        # ann1: ic1, furl1
+        # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
+        # ann1b: ic2, furl1
+        # ann2: ic2, furl2
+
+        self.ann1 = make_ann_t(ic1, furl1, privkey)
+        self.ann1a = make_ann_t(ic1, furl1a, privkey)
+        self.ann1b = make_ann_t(ic2, furl1, privkey)
+        self.ann2 = make_ann_t(ic2, furl2, privkey)
+
+        ic1.remote_announce_v2([self.ann1]) # queues eventual-send
+        d = fireEventually()
+        def _then1(ign):
+            self.failUnlessEqual(len(announcements), 1)
+            key_s,ann = announcements[0]
+            self.failUnlessEqual(key_s, pubkey_s)
+            self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
+            self.failUnlessEqual(ann["my-version"], "ver23")
+        d.addCallback(_then1)
+
+        # now send a duplicate announcement. This should not fire the
+        # subscriber
+        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
+        d.addCallback(fireEventually)
+        def _then2(ign):
+            self.failUnlessEqual(len(announcements), 1)
+        d.addCallback(_then2)
+
+        # and a replacement announcement: same FURL, new other stuff. The
+        # subscriber *should* be fired.
+        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
+        d.addCallback(fireEventually)
+        def _then3(ign):
+            self.failUnlessEqual(len(announcements), 2)
+            key_s,ann = announcements[-1]
+            self.failUnlessEqual(key_s, pubkey_s)
+            self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
+            self.failUnlessEqual(ann["my-version"], "ver24")
+        d.addCallback(_then3)
+
+        # and a replacement announcement with a different FURL (it uses
+        # different connection hints)
+        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
+        d.addCallback(fireEventually)
+        def _then4(ign):
+            self.failUnlessEqual(len(announcements), 3)
+            key_s,ann = announcements[-1]
+            self.failUnlessEqual(key_s, pubkey_s)
+            self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
+            self.failUnlessEqual(ann["my-version"], "ver23")
+        d.addCallback(_then4)
+
+        # now add a new subscription, which should be called with the
+        # backlog. The introducer only records one announcement per index, so
+        # the backlog will only have the latest message.
+        announcements2 = []
+        def _received2(key_s, ann):
+            announcements2.append( (key_s, ann) )
+        d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
+        d.addCallback(fireEventually)
+        def _then5(ign):
+            self.failUnlessEqual(len(announcements2), 1)
+            key_s,ann = announcements2[-1]
+            self.failUnlessEqual(key_s, pubkey_s)
+            self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
+            self.failUnlessEqual(ann["my-version"], "ver23")
+        d.addCallback(_then5)
+        return d
+
+    def test_id_collision(self):
+        # test replacement case where tubid equals a keyid (one should
+        # not replace the other)
+        ic = IntroducerClient(None,
+                              "introducer.furl", u"my_nickname",
+                              "my_version", "oldest_version", {})
+        announcements = []
+        ic.subscribe_to("storage",
+                        lambda key_s,ann: announcements.append(ann))
+        sk_s, vk_s = keyutil.make_keypair()
+        sk, _ignored = keyutil.parse_privkey(sk_s)
+        keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
+        furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
+        furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
+        ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
+        ic.remote_announce_v2([ann_t])
+        d = fireEventually()
+        def _then(ign):
+            # first announcement has been processed
+            self.failUnlessEqual(len(announcements), 1)
+            self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
+                                 furl1)
+            # now submit a second one, with a tubid that happens to look just
+            # like the pubkey-based serverid we just processed. They should
+            # not overlap.
+            ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
+            ca = WrapV2ClientInV1Interface(ic)
+            ca.remote_announce([ann2])
+            return fireEventually()
+        d.addCallback(_then)
+        def _then2(ign):
+            # if they overlapped, the second announcement would be ignored
+            self.failUnlessEqual(len(announcements), 2)
+            self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
+                                 furl2)
+        d.addCallback(_then2)
+        return d
+
+
 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
 
     def create_tub(self, portnum=0):
@@ -88,36 +311,86 @@ class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
             assert self.central_portnum == portnum
         tub.setLocation("localhost:%d" % self.central_portnum)
 
-class SystemTest(SystemTestMixin, unittest.TestCase):
-
-    def test_system(self):
-        self.basedir = "introducer/SystemTest/system"
+class Queue(SystemTestMixin, unittest.TestCase):
+    def test_queue_until_connected(self):
+        self.basedir = "introducer/QueueUntilConnected/queued"
         os.makedirs(self.basedir)
-        return self.do_system_test(IntroducerService)
-    test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
+        self.create_tub()
+        introducer = IntroducerService()
+        introducer.setServiceParent(self.parent)
+        iff = os.path.join(self.basedir, "introducer.furl")
+        ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
+        tub2 = Tub()
+        tub2.setServiceParent(self.parent)
+        c = IntroducerClient(tub2, ifurl,
+                             u"nickname", "version", "oldest", {})
+        furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
+        sk_s, vk_s = keyutil.make_keypair()
+        sk, _ignored = keyutil.parse_privkey(sk_s)
+
+        d = introducer.disownServiceParent()
+        def _offline(ign):
+            # now that the introducer server is offline, create a client and
+            # publish some messages
+            c.setServiceParent(self.parent) # this starts the reconnector
+            c.publish("storage", make_ann(furl1), sk)
+
+            introducer.setServiceParent(self.parent) # restart the server
+            # now wait for the messages to be delivered
+            def _got_announcement():
+                return bool(introducer.get_announcements())
+            return self.poll(_got_announcement)
+        d.addCallback(_offline)
+        def _done(ign):
+            v = list(introducer.get_announcements().values())[0]
+            (ign, ign, ann1_out, ign) = v
+            self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
+        d.addCallback(_done)
+
+        # now let the ack get back
+        def _wait_until_idle(ign):
+            def _idle():
+                if c._debug_outstanding:
+                    return False
+                if introducer._debug_outstanding:
+                    return False
+                return True
+            return self.poll(_idle)
+        d.addCallback(_wait_until_idle)
+        return d
 
-    def do_system_test(self, create_introducer):
+
+V1 = "v1"; V2 = "v2"
+class SystemTest(SystemTestMixin, unittest.TestCase):
+
+    def do_system_test(self, server_version):
         self.create_tub()
-        introducer = create_introducer()
+        if server_version == V1:
+            introducer = old.IntroducerService_v1()
+        else:
+            introducer = IntroducerService()
         introducer.setServiceParent(self.parent)
         iff = os.path.join(self.basedir, "introducer.furl")
         tub = self.central_tub
         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
         self.introducer_furl = ifurl
 
-        NUMCLIENTS = 5
-        # we have 5 clients who publish themselves, and an extra one does
-        # which not. When the connections are fully established, all six nodes
+        # we have 5 clients who publish themselves as storage servers, and a
+        # sixth which does which not. All 6 clients subscriber to hear about
+        # storage. When the connections are fully established, all six nodes
         # should have 5 connections each.
+        NUM_STORAGE = 5
+        NUM_CLIENTS = 6
 
         clients = []
         tubs = {}
         received_announcements = {}
-        NUM_SERVERS = NUMCLIENTS
         subscribing_clients = []
         publishing_clients = []
+        privkeys = {}
+        expected_announcements = [0 for c in range(NUM_CLIENTS)]
 
-        for i in range(NUMCLIENTS+1):
+        for i in range(NUM_CLIENTS):
             tub = Tub()
             #tub.setOption("logLocalFailures", True)
             #tub.setOption("logRemoteFailures", True)
@@ -128,62 +401,171 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
             tub.setLocation("localhost:%d" % portnum)
 
             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
-            c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i,
-                                 "version", "oldest")
+            if i == 0:
+                c = old.IntroducerClient_v1(tub, self.introducer_furl,
+                                            u"nickname-%d" % i,
+                                            "version", "oldest")
+            else:
+                c = IntroducerClient(tub, self.introducer_furl,
+                                     u"nickname-%d" % i,
+                                     "version", "oldest",
+                                     {"component": "component-v1"})
             received_announcements[c] = {}
-            def got(serverid, ann_d, announcements):
-                announcements[serverid] = ann_d
-            c.subscribe_to("storage", got, received_announcements[c])
+            def got(key_s_or_tubid, ann, announcements, i):
+                if i == 0:
+                    index = get_tubid_string_from_ann(ann)
+                else:
+                    index = key_s_or_tubid or get_tubid_string_from_ann(ann)
+                announcements[index] = ann
+            c.subscribe_to("storage", got, received_announcements[c], i)
             subscribing_clients.append(c)
-
-            if i < NUMCLIENTS:
-                node_furl = tub.registerReference(Referenceable())
-                c.publish(node_furl, "storage", "ri_name")
+            expected_announcements[i] += 1 # all expect a 'storage' announcement
+
+            node_furl = tub.registerReference(Referenceable())
+            if i < NUM_STORAGE:
+                if i == 0:
+                    c.publish(node_furl, "storage", "ri_name")
+                elif i == 1:
+                    # sign the announcement
+                    privkey_s, pubkey_s = keyutil.make_keypair()
+                    privkey, _ignored = keyutil.parse_privkey(privkey_s)
+                    privkeys[c] = privkey
+                    c.publish("storage", make_ann(node_furl), privkey)
+                else:
+                    c.publish("storage", make_ann(node_furl))
                 publishing_clients.append(c)
-            # the last one does not publish anything
+            else:
+                # the last one does not publish anything
+                pass
+
+            if i == 0:
+                # users of the V1 client were required to publish a
+                # 'stub_client' record (somewhat after they published the
+                # 'storage' record), so the introducer could see their
+                # version. Match that behavior.
+                c.publish(node_furl, "stub_client", "stub_ri_name")
+
+            if i == 2:
+                # also publish something that nobody cares about
+                boring_furl = tub.registerReference(Referenceable())
+                c.publish("boring", make_ann(boring_furl))
 
             c.setServiceParent(self.parent)
             clients.append(c)
             tubs[c] = tub
 
-        def _wait_for_all_connections():
-            for c in subscribing_clients:
-                if len(received_announcements[c]) < NUM_SERVERS:
+
+        def _wait_for_connected(ign):
+            def _connected():
+                for c in clients:
+                    if not c.connected_to_introducer():
+                        return False
+                return True
+            return self.poll(_connected)
+
+        # we watch the clients to determine when the system has settled down.
+        # Then we can look inside the server to assert things about its
+        # state.
+
+        def _wait_for_expected_announcements(ign):
+            def _got_expected_announcements():
+                for i,c in enumerate(subscribing_clients):
+                    if len(received_announcements[c]) < expected_announcements[i]:
+                        return False
+                return True
+            return self.poll(_got_expected_announcements)
+
+        # before shutting down any Tub, we'd like to know that there are no
+        # messages outstanding
+
+        def _wait_until_idle(ign):
+            def _idle():
+                for c in subscribing_clients + publishing_clients:
+                    if c._debug_outstanding:
+                        return False
+                if introducer._debug_outstanding:
                     return False
-            return True
-        d = self.poll(_wait_for_all_connections)
+                return True
+            return self.poll(_idle)
+
+        d = defer.succeed(None)
+        d.addCallback(_wait_for_connected)
+        d.addCallback(_wait_for_expected_announcements)
+        d.addCallback(_wait_until_idle)
 
         def _check1(res):
             log.msg("doing _check1")
             dc = introducer._debug_counts
-            self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS)
-            self.failUnlessEqual(dc["inbound_duplicate"], 0)
+            if server_version == V1:
+                # each storage server publishes a record, and (after its
+                # 'subscribe' has been ACKed) also publishes a "stub_client".
+                # The non-storage client (which subscribes) also publishes a
+                # stub_client. There is also one "boring" service. The number
+                # of messages is higher, because the stub_clients aren't
+                # published until after we get the 'subscribe' ack (since we
+                # don't realize that we're dealing with a v1 server [which
+                # needs stub_clients] until then), and the act of publishing
+                # the stub_client causes us to re-send all previous
+                # announcements.
+                self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
+                                     NUM_STORAGE + NUM_CLIENTS + 1)
+            else:
+                # each storage server publishes a record. There is also one
+                # "stub_client" and one "boring"
+                self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
+                self.failUnlessEqual(dc["inbound_duplicate"], 0)
             self.failUnlessEqual(dc["inbound_update"], 0)
-            self.failUnless(dc["outbound_message"])
+            self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
+            # the number of outbound messages is tricky.. I think it depends
+            # upon a race between the publish and the subscribe messages.
+            self.failUnless(dc["outbound_message"] > 0)
+            # each client subscribes to "storage", and each server publishes
+            self.failUnlessEqual(dc["outbound_announcements"],
+                                 NUM_STORAGE*NUM_CLIENTS)
 
-            for c in clients:
-                self.failUnless(c.connected_to_introducer())
             for c in subscribing_clients:
                 cdc = c._debug_counts
                 self.failUnless(cdc["inbound_message"])
                 self.failUnlessEqual(cdc["inbound_announcement"],
-                                     NUM_SERVERS)
+                                     NUM_STORAGE)
                 self.failUnlessEqual(cdc["wrong_service"], 0)
                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
                 self.failUnlessEqual(cdc["update"], 0)
                 self.failUnlessEqual(cdc["new_announcement"],
-                                     NUM_SERVERS)
+                                     NUM_STORAGE)
                 anns = received_announcements[c]
-                self.failUnlessEqual(len(anns), NUM_SERVERS)
+                self.failUnlessEqual(len(anns), NUM_STORAGE)
 
-                nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
-                ann_d = anns[nodeid0]
-                nick = ann_d["nickname"]
+                nodeid0 = tubs[clients[0]].tubID
+                ann = anns[nodeid0]
+                nick = ann["nickname"]
                 self.failUnlessEqual(type(nick), unicode)
                 self.failUnlessEqual(nick, u"nickname-0")
-            for c in publishing_clients:
-                cdc = c._debug_counts
-                self.failUnlessEqual(cdc["outbound_message"], 1)
+            if server_version == V1:
+                for c in publishing_clients:
+                    cdc = c._debug_counts
+                    expected = 1 # storage
+                    if c is clients[2]:
+                        expected += 1 # boring
+                    if c is not clients[0]:
+                        # the v2 client tries to call publish_v2, which fails
+                        # because the server is v1. It then re-sends
+                        # everything it has so far, plus a stub_client record
+                        expected = 2*expected + 1
+                    if c is clients[0]:
+                        # we always tell v1 client to send stub_client
+                        expected += 1
+                    self.failUnlessEqual(cdc["outbound_message"], expected)
+            else:
+                for c in publishing_clients:
+                    cdc = c._debug_counts
+                    expected = 1
+                    if c in [clients[0], # stub_client
+                             clients[2], # boring
+                             ]:
+                        expected = 2
+                    self.failUnlessEqual(cdc["outbound_message"], expected)
+            log.msg("_check1 done")
         d.addCallback(_check1)
 
         # force an introducer reconnect, by shutting down the Tub it's using
@@ -196,67 +578,54 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
 
-        def _wait_for_introducer_loss():
-            for c in clients:
-                if c.connected_to_introducer():
-                    return False
-            return True
-        d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
+        def _wait_for_introducer_loss(ign):
+            def _introducer_lost():
+                for c in clients:
+                    if c.connected_to_introducer():
+                        return False
+                return True
+            return self.poll(_introducer_lost)
+        d.addCallback(_wait_for_introducer_loss)
 
         def _restart_introducer_tub(_ign):
             log.msg("restarting introducer's Tub")
-
-            dc = introducer._debug_counts
-            self.expected_count = dc["inbound_message"] + NUM_SERVERS
-            self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1
-            introducer._debug0 = dc["outbound_message"]
-            for c in subscribing_clients:
-                cdc = c._debug_counts
-                c._debug0 = cdc["inbound_message"]
-
+            # reset counters
+            for i in range(NUM_CLIENTS):
+                c = subscribing_clients[i]
+                for k in c._debug_counts:
+                    c._debug_counts[k] = 0
+            for k in introducer._debug_counts:
+                introducer._debug_counts[k] = 0
+            expected_announcements[i] += 1 # new 'storage' for everyone
             self.create_tub(self.central_portnum)
             newfurl = self.central_tub.registerReference(introducer,
                                                          furlFile=iff)
             assert newfurl == self.introducer_furl
         d.addCallback(_restart_introducer_tub)
 
-        def _wait_for_introducer_reconnect():
-            # wait until:
-            #  all clients are connected
-            #  the introducer has received publish messages from all of them
-            #  the introducer has received subscribe messages from all of them
-            #  the introducer has sent (duplicate) announcements to all of them
-            #  all clients have received (duplicate) announcements
-            dc = introducer._debug_counts
-            for c in clients:
-                if not c.connected_to_introducer():
-                    return False
-            if dc["inbound_message"] < self.expected_count:
-                return False
-            if dc["inbound_subscribe"] < self.expected_subscribe_count:
-                return False
-            for c in subscribing_clients:
-                cdc = c._debug_counts
-                if cdc["inbound_message"] < c._debug0+1:
-                    return False
-            return True
-        d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
+        d.addCallback(_wait_for_connected)
+        d.addCallback(_wait_for_expected_announcements)
+        d.addCallback(_wait_until_idle)
+        d.addCallback(lambda _ign: log.msg(" reconnected"))
 
+        # TODO: publish something while the introducer is offline, then
+        # confirm it gets delivered when the connection is reestablished
         def _check2(res):
             log.msg("doing _check2")
             # assert that the introducer sent out new messages, one per
             # subscriber
             dc = introducer._debug_counts
-            self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS)
-            self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS)
-            self.failUnlessEqual(dc["inbound_update"], 0)
-            self.failUnlessEqual(dc["outbound_message"],
-                                 introducer._debug0 + len(subscribing_clients))
-            for c in clients:
-                self.failUnless(c.connected_to_introducer())
+            self.failUnlessEqual(dc["outbound_announcements"],
+                                 NUM_STORAGE*NUM_CLIENTS)
+            self.failUnless(dc["outbound_message"] > 0)
+            self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
             for c in subscribing_clients:
                 cdc = c._debug_counts
-                self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS)
+                self.failUnlessEqual(cdc["inbound_message"], 1)
+                self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
+                self.failUnlessEqual(cdc["new_announcement"], 0)
+                self.failUnlessEqual(cdc["wrong_service"], 0)
+                self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
         d.addCallback(_check2)
 
         # Then force an introducer restart, by shutting down the Tub,
@@ -267,71 +636,211 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
 
         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
-        d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
+        d.addCallback(_wait_for_introducer_loss)
+        d.addCallback(lambda _ign: log.msg("introducer lost"))
 
         def _restart_introducer(_ign):
             log.msg("restarting introducer")
             self.create_tub(self.central_portnum)
-
-            for c in subscribing_clients:
-                # record some counters for later comparison. Stash the values
-                # on the client itself, because I'm lazy.
-                cdc = c._debug_counts
-                c._debug1 = cdc["inbound_announcement"]
-                c._debug2 = cdc["inbound_message"]
-                c._debug3 = cdc["new_announcement"]
-            newintroducer = create_introducer()
-            self.expected_message_count = NUM_SERVERS
-            self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients)
-            self.expected_subscribe_count = len(subscribing_clients)
-            newfurl = self.central_tub.registerReference(newintroducer,
+            # reset counters
+            for i in range(NUM_CLIENTS):
+                c = subscribing_clients[i]
+                for k in c._debug_counts:
+                    c._debug_counts[k] = 0
+            expected_announcements[i] += 1 # new 'storage' for everyone
+            if server_version == V1:
+                introducer = old.IntroducerService_v1()
+            else:
+                introducer = IntroducerService()
+            newfurl = self.central_tub.registerReference(introducer,
                                                          furlFile=iff)
             assert newfurl == self.introducer_furl
         d.addCallback(_restart_introducer)
-        def _wait_for_introducer_reconnect2():
-            # wait until:
-            #  all clients are connected
-            #  the introducer has received publish messages from all of them
-            #  the introducer has received subscribe messages from all of them
-            #  the introducer has sent announcements for everybody to everybody
-            #  all clients have received all the (duplicate) announcements
-            # at that point, the system should be quiescent
-            dc = introducer._debug_counts
-            for c in clients:
-                if not c.connected_to_introducer():
-                    return False
-            if dc["inbound_message"] < self.expected_message_count:
-                return False
-            if dc["outbound_announcements"] < self.expected_announcement_count:
-                return False
-            if dc["inbound_subscribe"] < self.expected_subscribe_count:
-                return False
-            for c in subscribing_clients:
-                cdc = c._debug_counts
-                if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
-                    return False
-            return True
-        d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
+
+        d.addCallback(_wait_for_connected)
+        d.addCallback(_wait_for_expected_announcements)
+        d.addCallback(_wait_until_idle)
 
         def _check3(res):
             log.msg("doing _check3")
-            for c in clients:
-                self.failUnless(c.connected_to_introducer())
+            dc = introducer._debug_counts
+            self.failUnlessEqual(dc["outbound_announcements"],
+                                 NUM_STORAGE*NUM_CLIENTS)
+            self.failUnless(dc["outbound_message"] > 0)
+            self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
             for c in subscribing_clients:
                 cdc = c._debug_counts
-                self.failUnless(cdc["inbound_announcement"] > c._debug1)
-                self.failUnless(cdc["inbound_message"] > c._debug2)
-                # there should have been no new announcements
-                self.failUnlessEqual(cdc["new_announcement"], c._debug3)
-                # and the right number of duplicate ones. There were
-                # NUM_SERVERS from the servertub restart, and there should be
-                # another NUM_SERVERS now
-                self.failUnlessEqual(cdc["duplicate_announcement"],
-                                     2*NUM_SERVERS)
+                self.failUnless(cdc["inbound_message"] > 0)
+                self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
+                self.failUnlessEqual(cdc["new_announcement"], 0)
+                self.failUnlessEqual(cdc["wrong_service"], 0)
+                self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
 
         d.addCallback(_check3)
         return d
 
+
+    def test_system_v2_server(self):
+        self.basedir = "introducer/SystemTest/system_v2_server"
+        os.makedirs(self.basedir)
+        return self.do_system_test(V2)
+    test_system_v2_server.timeout = 480
+    # occasionally takes longer than 350s on "draco"
+
+    def test_system_v1_server(self):
+        self.basedir = "introducer/SystemTest/system_v1_server"
+        os.makedirs(self.basedir)
+        return self.do_system_test(V1)
+    test_system_v1_server.timeout = 480
+    # occasionally takes longer than 350s on "draco"
+
+class FakeRemoteReference:
+    def notifyOnDisconnect(self, *args, **kwargs): pass
+    def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
+
+class ClientInfo(unittest.TestCase):
+    def test_client_v2(self):
+        introducer = IntroducerService()
+        tub = introducer_furl = None
+        app_versions = {"whizzy": "fizzy"}
+        client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
+                                     "my_version", "oldest", app_versions)
+        #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
+        #ann_s = make_ann_t(client_v2, furl1, None)
+        #introducer.remote_publish_v2(ann_s, Referenceable())
+        subscriber = FakeRemoteReference()
+        introducer.remote_subscribe_v2(subscriber, "storage",
+                                       client_v2._my_subscriber_info)
+        s = introducer.get_subscribers()
+        self.failUnlessEqual(len(s), 1)
+        sn, when, si, rref = s[0]
+        self.failUnlessIdentical(rref, subscriber)
+        self.failUnlessEqual(sn, "storage")
+        self.failUnlessEqual(si["version"], 0)
+        self.failUnlessEqual(si["oldest-supported"], "oldest")
+        self.failUnlessEqual(si["app-versions"], app_versions)
+        self.failUnlessEqual(si["nickname"], u"nick-v2")
+        self.failUnlessEqual(si["my-version"], "my_version")
+
+    def test_client_v1(self):
+        introducer = IntroducerService()
+        subscriber = FakeRemoteReference()
+        introducer.remote_subscribe(subscriber, "storage")
+        # the v1 subscribe interface had no subscriber_info: that was usually
+        # sent in a separate stub_client pseudo-announcement
+        s = introducer.get_subscribers()
+        self.failUnlessEqual(len(s), 1)
+        sn, when, si, rref = s[0]
+        # rref will be a WrapV1SubscriberInV2Interface around the real
+        # subscriber
+        self.failUnlessIdentical(rref.original, subscriber)
+        self.failUnlessEqual(si, None) # not known yet
+        self.failUnlessEqual(sn, "storage")
+
+        # now submit the stub_client announcement
+        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
+        ann = (furl1, "stub_client", "RIStubClient",
+               u"nick-v1".encode("utf-8"), "my_version", "oldest")
+        introducer.remote_publish(ann)
+        # the server should correlate the two
+        s = introducer.get_subscribers()
+        self.failUnlessEqual(len(s), 1)
+        sn, when, si, rref = s[0]
+        self.failUnlessIdentical(rref.original, subscriber)
+        self.failUnlessEqual(sn, "storage")
+
+        self.failUnlessEqual(si["version"], 0)
+        self.failUnlessEqual(si["oldest-supported"], "oldest")
+        # v1 announcements do not contain app-versions
+        self.failUnlessEqual(si["app-versions"], {})
+        self.failUnlessEqual(si["nickname"], u"nick-v1")
+        self.failUnlessEqual(si["my-version"], "my_version")
+
+        # a subscription that arrives after the stub_client announcement
+        # should be correlated too
+        subscriber2 = FakeRemoteReference()
+        introducer.remote_subscribe(subscriber2, "thing2")
+
+        s = introducer.get_subscribers()
+        subs = dict([(sn, (si,rref)) for sn, when, si, rref in s])
+        self.failUnlessEqual(len(subs), 2)
+        (si,rref) = subs["thing2"]
+        self.failUnlessIdentical(rref.original, subscriber2)
+        self.failUnlessEqual(si["version"], 0)
+        self.failUnlessEqual(si["oldest-supported"], "oldest")
+        # v1 announcements do not contain app-versions
+        self.failUnlessEqual(si["app-versions"], {})
+        self.failUnlessEqual(si["nickname"], u"nick-v1")
+        self.failUnlessEqual(si["my-version"], "my_version")
+
+class Announcements(unittest.TestCase):
+    def test_client_v2_unsigned(self):
+        introducer = IntroducerService()
+        tub = introducer_furl = None
+        app_versions = {"whizzy": "fizzy"}
+        client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
+                                     "my_version", "oldest", app_versions)
+        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
+        tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
+        ann_s0 = make_ann_t(client_v2, furl1, None)
+        canary0 = Referenceable()
+        introducer.remote_publish_v2(ann_s0, canary0)
+        a = introducer.get_announcements()
+        self.failUnlessEqual(len(a), 1)
+        (index, (ann_s, canary, ann, when)) = a.items()[0]
+        self.failUnlessIdentical(canary, canary0)
+        self.failUnlessEqual(index, ("storage", None, tubid))
+        self.failUnlessEqual(ann["app-versions"], app_versions)
+        self.failUnlessEqual(ann["nickname"], u"nick-v2")
+        self.failUnlessEqual(ann["service-name"], "storage")
+        self.failUnlessEqual(ann["my-version"], "my_version")
+        self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
+
+    def test_client_v2_signed(self):
+        introducer = IntroducerService()
+        tub = introducer_furl = None
+        app_versions = {"whizzy": "fizzy"}
+        client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
+                                     "my_version", "oldest", app_versions)
+        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
+        sk_s, vk_s = keyutil.make_keypair()
+        sk, _ignored = keyutil.parse_privkey(sk_s)
+        pks = keyutil.remove_prefix(vk_s, "pub-")
+        ann_t0 = make_ann_t(client_v2, furl1, sk)
+        canary0 = Referenceable()
+        introducer.remote_publish_v2(ann_t0, canary0)
+        a = introducer.get_announcements()
+        self.failUnlessEqual(len(a), 1)
+        (index, (ann_s, canary, ann, when)) = a.items()[0]
+        self.failUnlessIdentical(canary, canary0)
+        self.failUnlessEqual(index, ("storage", pks, None))
+        self.failUnlessEqual(ann["app-versions"], app_versions)
+        self.failUnlessEqual(ann["nickname"], u"nick-v2")
+        self.failUnlessEqual(ann["service-name"], "storage")
+        self.failUnlessEqual(ann["my-version"], "my_version")
+        self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
+
+    def test_client_v1(self):
+        introducer = IntroducerService()
+
+        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
+        tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
+        ann = (furl1, "storage", "RIStorage",
+               u"nick-v1".encode("utf-8"), "my_version", "oldest")
+        introducer.remote_publish(ann)
+
+        a = introducer.get_announcements()
+        self.failUnlessEqual(len(a), 1)
+        (index, (ann_s, canary, ann, when)) = a.items()[0]
+        self.failUnlessEqual(canary, None)
+        self.failUnlessEqual(index, ("storage", None, tubid))
+        self.failUnlessEqual(ann["app-versions"], {})
+        self.failUnlessEqual(ann["nickname"], u"nick-v1".encode("utf-8"))
+        self.failUnlessEqual(ann["service-name"], "storage")
+        self.failUnlessEqual(ann["my-version"], "my_version")
+        self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
+
+
 class TooNewServer(IntroducerService):
     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
                  { },
@@ -359,10 +868,10 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
         tub.setLocation("localhost:%d" % portnum)
 
         c = IntroducerClient(tub, self.introducer_furl,
-                             u"nickname-client", "version", "oldest")
+                             u"nickname-client", "version", "oldest", {})
         announcements = {}
-        def got(serverid, ann_d):
-            announcements[serverid] = ann_d
+        def got(key_s, ann):
+            announcements[key_s] = ann
         c.subscribe_to("storage", got)
 
         c.setServiceParent(self.parent)
@@ -374,7 +883,8 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
         d = self.poll(_got_bad)
         def _done(res):
             self.failUnless(c._introducer_error)
-            self.failUnless(c._introducer_error.check(InsufficientVersionError))
+            self.failUnless(c._introducer_error.check(InsufficientVersionError),
+                            c._introducer_error)
         d.addCallback(_done)
         return d
 
@@ -388,3 +898,44 @@ class DecodeFurl(unittest.TestCase):
         nodeid = b32decode(m.group(1).upper())
         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
 
+class Signatures(unittest.TestCase):
+    def test_sign(self):
+        ann = {"key1": "value1"}
+        sk_s,vk_s = keyutil.make_keypair()
+        sk,ignored = keyutil.parse_privkey(sk_s)
+        ann_t = sign_to_foolscap(ann, sk)
+        (msg, sig, key) = ann_t
+        self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
+        self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
+        self.failUnless(sig.startswith("v0-"))
+        self.failUnless(key.startswith("v0-"))
+        (ann2,key2) = unsign_from_foolscap(ann_t)
+        self.failUnlessEqual(ann2, ann)
+        self.failUnlessEqual("pub-"+key2, vk_s)
+
+        # bad signature
+        bad_ann = {"key1": "value2"}
+        bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
+        self.failUnlessRaises(keyutil.BadSignatureError,
+                              unsign_from_foolscap, (bad_msg,sig,key))
+        # sneaky bad signature should be ignored
+        (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
+        self.failUnlessEqual(key2, None)
+        self.failUnlessEqual(ann2, bad_ann)
+
+        # unrecognized signatures
+        self.failUnlessRaises(UnknownKeyError,
+                              unsign_from_foolscap, (bad_msg,"v999-sig",key))
+        self.failUnlessRaises(UnknownKeyError,
+                              unsign_from_foolscap, (bad_msg,sig,"v999-key"))
+
+
+# add tests of StorageFarmBroker: if it receives duplicate announcements, it
+# should leave the Reconnector in place, also if it receives
+# same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
+# should tear down the Reconnector and make a new one. This behavior used to
+# live in the IntroducerClient, and thus used to be tested by test_introducer
+
+# copying more tests from old branch:
+
+#  then also add Upgrade test
index 007aa8812a374d2018edd2e14d116574c5ba08b9..c8781178ea87fb86c6e41895571bfa17cf9b6246 100644 (file)
@@ -228,7 +228,9 @@ def make_storagebroker(s=None, num_peers=10):
     storage_broker = StorageFarmBroker(None, True)
     for peerid in peerids:
         fss = FakeStorageServer(peerid, s)
-        storage_broker.test_add_rref(peerid, fss)
+        ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
+               "permutation-seed-base32": base32.b2a(peerid) }
+        storage_broker.test_add_rref(peerid, fss, ann)
     return storage_broker
 
 def make_nodemaker(s=None, num_peers=10):
index 3cbaa7c9cbfecfbb712cae60bdf4dbb540275d58..23bf1aa28b64f6138dd59d86578ebd3d70bdf6f0 100644 (file)
@@ -783,7 +783,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
                 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
 
                 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
-                self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
+                self.failUnless("Announcement Summary: storage: 5" in res)
                 self.failUnless("Subscription Summary: storage: 5" in res)
                 self.failUnless("tahoe.css" in res)
             except unittest.FailTest:
@@ -804,9 +804,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
                 self.failUnlessEqual(data["subscription_summary"],
                                      {"storage": 5})
                 self.failUnlessEqual(data["announcement_summary"],
-                                     {"storage": 5, "stub_client": 5})
+                                     {"storage": 5})
                 self.failUnlessEqual(data["announcement_distinct_hosts"],
-                                     {"storage": 1, "stub_client": 1})
+                                     {"storage": 1})
             except unittest.FailTest:
                 print
                 print "GET %s?t=json output was:" % self.introweb_url
index 224b8bd5fb8139adbb78f5c57f5f3dba1513eb2f..cf050b7850ed85f39c5517191a26ecd8c26405b1 100644 (file)
@@ -11,7 +11,7 @@ import allmydata # for __full_version__
 from allmydata import uri, monitor, client
 from allmydata.immutable import upload, encode
 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
-from allmydata.util import log
+from allmydata.util import log, base32
 from allmydata.util.assertutil import precondition
 from allmydata.util.deferredutil import DeferredListShouldSucceed
 from allmydata.test.no_network import GridTestMixin
@@ -197,7 +197,9 @@ class FakeClient:
                     for fakeid in range(self.num_servers) ]
         self.storage_broker = StorageFarmBroker(None, permute_peers=True)
         for (serverid, rref) in servers:
-            self.storage_broker.test_add_rref(serverid, rref)
+            ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
+                   "permutation-seed-base32": base32.b2a(serverid) }
+            self.storage_broker.test_add_rref(serverid, rref, ann)
         self.last_servers = [s[1] for s in servers]
 
     def log(self, *args, **kwargs):
index 0e25e20a043e6a047cb4af6f591c3d765d51d42e..783b2beec8fc8089604aeb38e1cedab42bb85175 100644 (file)
@@ -34,15 +34,20 @@ class IntroducerRoot(rend.Page):
 
     def render_JSON(self, ctx):
         res = {}
-        clients = self.introducer_service.get_subscribers()
-        subscription_summary = dict([ (name, len(clients[name]))
-                                      for name in clients ])
-        res["subscription_summary"] = subscription_summary
+
+        counts = {}
+        subscribers = self.introducer_service.get_subscribers()
+        for (service_name, ign, ign, ign) in subscribers:
+            if service_name not in counts:
+                counts[service_name] = 0
+            counts[service_name] += 1
+        res["subscription_summary"] = counts
 
         announcement_summary = {}
         service_hosts = {}
-        for (ann,when) in self.introducer_service.get_announcements().values():
-            (furl, service_name, ri_name, nickname, ver, oldest) = ann
+        for a in self.introducer_service.get_announcements().values():
+            (_, _, ann, when) = a
+            service_name = ann["service-name"]
             if service_name not in announcement_summary:
                 announcement_summary[service_name] = 0
             announcement_summary[service_name] += 1
@@ -55,6 +60,7 @@ class IntroducerRoot(rend.Page):
             # enough: when multiple services are run on a single host,
             # they're usually either configured with the same addresses,
             # or setLocationAutomatically picks up the same interfaces.
+            furl = ann["anonymous-storage-FURL"]
             locations = SturdyRef(furl).getTubRef().getLocations()
             # list of tuples, ("ipv4", host, port)
             host = frozenset([hint[1]
@@ -79,8 +85,9 @@ class IntroducerRoot(rend.Page):
 
     def render_announcement_summary(self, ctx, data):
         services = {}
-        for (ann,when) in self.introducer_service.get_announcements().values():
-            (furl, service_name, ri_name, nickname, ver, oldest) = ann
+        for a in self.introducer_service.get_announcements().values():
+            (_, _, ann, when) = a
+            service_name = ann["service-name"]
             if service_name not in services:
                 services[service_name] = 0
             services[service_name] += 1
@@ -90,65 +97,52 @@ class IntroducerRoot(rend.Page):
                           for service_name in service_names])
 
     def render_client_summary(self, ctx, data):
+        counts = {}
         clients = self.introducer_service.get_subscribers()
-        service_names = clients.keys()
-        service_names.sort()
-        return ", ".join(["%s: %d" % (service_name, len(clients[service_name]))
-                          for service_name in service_names])
+        for (service_name, ign, ign, ign) in clients:
+            if service_name not in counts:
+                counts[service_name] = 0
+            counts[service_name] += 1
+        return ", ".join([ "%s: %d" % (name, counts[name])
+                           for name in sorted(counts.keys()) ] )
 
     def data_services(self, ctx, data):
         introsvc = self.introducer_service
-        ann = [(since,a)
-               for (a,since) in introsvc.get_announcements().values()
-               if a[1] != "stub_client"]
-        ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
-        return ann
-
-    def render_service_row(self, ctx, (since,announcement)):
-        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
-        sr = SturdyRef(furl)
+        services = []
+        for a in introsvc.get_announcements().values():
+            (_, _, ann, when) = a
+            if ann["service-name"] == "stub_client":
+                continue
+            services.append( (when, ann) )
+        services.sort(key=lambda x: (x[1]["service-name"], x[1]["nickname"]))
+        # this used to be:
+        #services.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
+        # service_name was the primary key, then the whole tuple (starting
+        # with the furl) was the secondary key
+        return services
+
+    def render_service_row(self, ctx, (since,ann)):
+        sr = SturdyRef(ann["anonymous-storage-FURL"])
         nodeid = sr.tubID
         advertised = self.show_location_hints(sr)
         ctx.fillSlots("peerid", nodeid)
-        ctx.fillSlots("nickname", nickname)
+        ctx.fillSlots("nickname", ann["nickname"])
         ctx.fillSlots("advertised", " ".join(advertised))
         ctx.fillSlots("connected", "?")
         TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
         ctx.fillSlots("announced",
                       time.strftime(TIME_FORMAT, time.localtime(since)))
-        ctx.fillSlots("version", ver)
-        ctx.fillSlots("service_name", service_name)
+        ctx.fillSlots("version", ann["my-version"])
+        ctx.fillSlots("service_name", ann["service-name"])
         return ctx.tag
 
     def data_subscribers(self, ctx, data):
-        # use the "stub_client" announcements to get information per nodeid
-        clients = {}
-        for (ann,when) in self.introducer_service.get_announcements().values():
-            if ann[1] != "stub_client":
-                continue
-            (furl, service_name, ri_name, nickname, ver, oldest) = ann
-            sr = SturdyRef(furl)
-            nodeid = sr.tubID
-            clients[nodeid] = ann
-
-        # then we actually provide information per subscriber
-        s = []
-        introsvc = self.introducer_service
-        for service_name, subscribers in introsvc.get_subscribers().items():
-            for (rref, timestamp) in subscribers.items():
-                sr = rref.getSturdyRef()
-                nodeid = sr.tubID
-                ann = clients.get(nodeid)
-                s.append( (service_name, rref, timestamp, ann) )
-        s.sort()
-        return s
+        return self.introducer_service.get_subscribers()
 
     def render_subscriber_row(self, ctx, s):
-        (service_name, rref, since, ann) = s
-        nickname = "?"
-        version = "?"
-        if ann:
-            (furl, service_name_2, ri_name, nickname, version, oldest) = ann
+        (service_name, since, info, rref) = s
+        nickname = info.get("nickname", "?")
+        version = info.get("my-version", "?")
 
         sr = rref.getSturdyRef()
         # if the subscriber didn't do Tub.setLocation, nodeid will be None