Date: Wed, 18 Jun 2008 23:58:34 +0000 (-0700)
Subject: introducer: add old (V1) introducer code, add test framework for compatibility testing
introducer: add old (V1) introducer code, add test framework for compatibility testing

diff --git a/src/allmydata/introducer/ b/src/allmydata/introducer/
new file mode 100644
index 00000000..b586b0c1
--- /dev/null
+++ b/src/allmydata/introducer/
@@ -0,0 +1,343 @@
+# We keep a copy of the old introducer (both client and server) here to
+# support compatibility tests. The old client is supposed to handle the new
+# server, and new client is supposed to handle the old server.
+import re, time, sha
+from base64 import b32decode
+from zope.interface import implements
+from twisted.application import service
+from foolscap import Referenceable
+from allmydata.interfaces import RIIntroducerSubscriberClient, IIntroducerClient
+from allmydata.util import log, idlib
+from allmydata.introducer.common import make_index
+from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService
+class RemoteServiceConnector:
+    """I hold information about a peer service that we want to connect to. If
+    we are connected, I hold the RemoteReference, the peer's address, and the
+    peer's version information. I remember information about when we were
+    last connected to the peer too, even if we aren't currently connected.
+    @ivar announcement_time: when we first heard about this service
+    @ivar last_connect_time: when we last established a connection
+    @ivar last_loss_time: when we last lost a connection
+    @ivar version: the peer's version, from the most recent announcement
+    @ivar oldest_supported: the peer's oldest supported version, same
+    @ivar nickname: the peer's self-reported nickname, same
+    @ivar rref: the RemoteReference, if connected, otherwise None
+    @ivar remote_host: the IAddress, if connected, otherwise None
+    """
+    def __init__(self, announcement, tub, ic):
+        self._tub = tub
+        self._announcement = announcement
+        self._ic = ic
+        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+        self._furl = furl
+        m = re.match(r'pb://(\w+)@', furl)
+        assert m
+        self._nodeid = b32decode(
+        self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
+        self.service_name = service_name
+        self.log("attempting to connect to %s" % self._nodeid_s)
+        self.announcement_time = time.time()
+        self.last_loss_time = None
+        self.rref = None
+        self.remote_host = None
+        self.last_connect_time = None
+        self.version = ver
+        self.oldest_supported = oldest
+        self.nickname = nickname
+    def log(self, *args, **kwargs):
+        return self._ic.log(*args, **kwargs)
+    def startConnecting(self):
+        self._reconnector = self._tub.connectTo(self._furl, self._got_service)
+    def stopConnecting(self):
+        self._reconnector.stopConnecting()
+    def _got_service(self, rref):
+        self.last_connect_time = time.time()
+        self.remote_host =
+        self.rref = rref
+        self.log("connected to %s" % self._nodeid_s)
+        self._ic.add_connection(self._nodeid, self.service_name, rref)
+        rref.notifyOnDisconnect(self._lost, rref)
+    def _lost(self, rref):
+        self.log("lost connection to %s" % self._nodeid_s)
+        self.last_loss_time = time.time()
+        self.rref = None
+        self.remote_host = None
+        self._ic.remove_connection(self._nodeid, self.service_name, rref)
+    def reset(self):
+        self._reconnector.reset()
+class IntroducerClient_V1(service.Service, Referenceable):
+    implements(RIIntroducerSubscriberClient, IIntroducerClient)
+    def __init__(self, tub, introducer_furl,
+                 nickname, my_version, oldest_supported):
+        self._tub = tub
+        self.introducer_furl = introducer_furl
+        self._nickname = nickname
+        self._my_version = my_version
+        self._oldest_supported = oldest_supported
+        self._published_announcements = set()
+        self._publisher = None
+        self._connected = False
+        self._subscribed_service_names = set()
+        self._subscriptions = set() # requests we've actually sent
+        self._received_announcements = set()
+        # TODO: this set will grow without bound, until the node is restarted
+        # we only accept one announcement per (peerid+service_name) pair.
+        # This insures that an upgraded host replace their previous
+        # announcement. It also means that each peer must have their own Tub
+        # (no sharing), which is slightly weird but consistent with the rest
+        # of the Tahoe codebase.
+        self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
+        # self._connections is a set of (peerid, service_name, rref) tuples
+        self._connections = set()
+        self.counter = 0 # incremented each time we change state, for tests
+        self.encoding_parameters = None
+    def startService(self):
+        service.Service.startService(self)
+        rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
+        self._introducer_reconnector = rc
+        def connect_failed(failure):
+            self.log("Initial Introducer connection failed: perhaps it's down",
+                     level=log.WEIRD, failure=failure)
+        d = self._tub.getReference(self.introducer_furl)
+        d.addErrback(connect_failed)
+    def _got_introducer(self, publisher):
+        self.log("connected to introducer")
+        self._connected = True
+        self._publisher = publisher
+        publisher.notifyOnDisconnect(self._disconnected)
+        self._maybe_publish()
+        self._maybe_subscribe()
+    def _disconnected(self):
+        self.log("bummer, we've lost our connection to the introducer")
+        self._connected = False
+        self._publisher = None
+        self._subscriptions.clear()
+    def stopService(self):
+        service.Service.stopService(self)
+        self._introducer_reconnector.stopConnecting()
+        for rsc in self._connectors.itervalues():
+            rsc.stopConnecting()
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.introducer"
+        return log.msg(*args, **kwargs)
+    def publish(self, furl, service_name, remoteinterface_name):
+        ann = (furl, service_name, remoteinterface_name,
+               self._nickname, self._my_version, self._oldest_supported)
+        self._published_announcements.add(ann)
+        self._maybe_publish()
+    def subscribe_to(self, service_name):
+        self._subscribed_service_names.add(service_name)
+        self._maybe_subscribe()
+    def _maybe_subscribe(self):
+        if not self._publisher:
+            self.log("want to subscribe, but no introducer yet",
+                     level=log.NOISY)
+            return
+        for service_name in self._subscribed_service_names:
+            if service_name not in self._subscriptions:
+                # there is a race here, but the subscription desk ignores
+                # duplicate requests.
+                self._subscriptions.add(service_name)
+                d = self._publisher.callRemote("subscribe", self, service_name)
+                d.addErrback(log.err, facility="tahoe.introducer",
+                             level=log.WEIRD)
+    def _maybe_publish(self):
+        if not self._publisher:
+            self.log("want to publish, but no introducer yet", level=log.NOISY)
+            return
+        # this re-publishes everything. The Introducer ignores duplicates
+        for ann in self._published_announcements:
+            d = self._publisher.callRemote("publish", ann)
+            d.addErrback(log.err, facility="tahoe.introducer",
+                         level=log.WEIRD)
+    def remote_announce(self, announcements):
+        for ann in announcements:
+            self.log("received %d announcements" % len(announcements))
+            (furl, service_name, ri_name, nickname, ver, oldest) = ann
+            if service_name not in self._subscribed_service_names:
+                self.log("announcement for a service we don't care about [%s]"
+                         % (service_name,), level=log.WEIRD)
+                continue
+            if ann in self._received_announcements:
+                self.log("ignoring old announcement: %s" % (ann,),
+                         level=log.NOISY)
+                continue
+            self.log("new announcement[%s]: %s" % (service_name, ann))
+            self._received_announcements.add(ann)
+            self._new_announcement(ann)
+    def _new_announcement(self, announcement):
+        # this will only be called for new announcements
+        index = make_index(announcement)
+        if index in self._connectors:
+            self.log("replacing earlier announcement", level=log.NOISY)
+            self._connectors[index].stopConnecting()
+        rsc = RemoteServiceConnector(announcement, self._tub, self)
+        self._connectors[index] = rsc
+        rsc.startConnecting()
+    def add_connection(self, nodeid, service_name, rref):
+        self._connections.add( (nodeid, service_name, rref) )
+        self.counter += 1
+        # when one connection is established, reset the timers on all others,
+        # to trigger a reconnection attempt in one second. This is intended
+        # to accelerate server connections when we've been offline for a
+        # while. The goal is to avoid hanging out for a long time with
+        # connections to only a subset of the servers, which would increase
+        # the chances that we'll put shares in weird places (and not update
+        # existing shares of mutable files). See #374 for more details.
+        for rsc in self._connectors.values():
+            rsc.reset()
+    def remove_connection(self, nodeid, service_name, rref):
+        self._connections.discard( (nodeid, service_name, rref) )
+        self.counter += 1
+    def get_all_connections(self):
+        return frozenset(self._connections)
+    def get_all_connectors(self):
+        return self._connectors.copy()
+    def get_all_peerids(self):
+        return frozenset([peerid
+                          for (peerid, service_name, rref)
+                          in self._connections])
+    def get_all_connections_for(self, service_name):
+        return frozenset([c
+                          for c in self._connections
+                          if c[1] == service_name])
+    def get_permuted_peers(self, service_name, key):
+        """Return an ordered list of (peerid, rref) tuples."""
+        results = []
+        for (c_peerid, c_service_name, rref) in self._connections:
+            assert isinstance(c_peerid, str)
+            if c_service_name != service_name:
+                continue
+            permuted = + c_peerid).digest()
+            results.append((permuted, c_peerid, rref))
+        results.sort(lambda a,b: cmp(a[0], b[0]))
+        return [ (r[1], r[2]) for r in results ]
+    def remote_set_encoding_parameters(self, parameters):
+        self.encoding_parameters = parameters
+    def connected_to_introducer(self):
+        return self._connected
+    def debug_disconnect_from_peerid(self, victim_nodeid):
+        # for unit tests: locate and sever all connections to the given
+        # peerid.
+        for (nodeid, service_name, rref) in self._connections:
+            if nodeid == victim_nodeid:
+class IntroducerService_V1(service.MultiService, Referenceable):
+    implements(RIIntroducerPublisherAndSubscriberService)
+    name = "introducer"
+    def __init__(self, basedir="."):
+        service.MultiService.__init__(self)
+        self.introducer_url = None
+        # 'index' is (tubid, service_name)
+        self._announcements = {} # dict of index -> (announcement, timestamp)
+        self._subscribers = {} # dict of (rref->timestamp) dicts
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.introducer"
+        return log.msg(*args, **kwargs)
+    def get_announcements(self):
+        return self._announcements
+    def get_subscribers(self):
+        return self._subscribers
+    def remote_publish(self, announcement):
+        self.log("introducer: announcement published: %s" % (announcement,) )
+        index = make_index(announcement)
+        if index in self._announcements:
+            (old_announcement, timestamp) = self._announcements[index]
+            if old_announcement == announcement:
+                self.log("but we already knew it, ignoring", level=log.NOISY)
+                return
+            else:
+                self.log("old announcement being updated", level=log.NOISY)
+        self._announcements[index] = (announcement, time.time())
+        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+        for s in self._subscribers.get(service_name, []):
+            s.callRemote("announce", set([announcement]))
+    def remote_subscribe(self, subscriber, service_name):
+        self.log("introducer: subscription[%s] request at %s" % (service_name,
+                                                                 subscriber))
+        if service_name not in self._subscribers:
+            self._subscribers[service_name] = {}
+        subscribers = self._subscribers[service_name]
+        if subscriber in subscribers:
+            self.log("but they're already subscribed, ignoring",
+                     level=log.UNUSUAL)
+            return
+        subscribers[subscriber] = time.time()
+        def _remove():
+            self.log("introducer: unsubscribing[%s] %s" % (service_name,
+                                                           subscriber))
+            subscribers.pop(subscriber, None)
+        subscriber.notifyOnDisconnect(_remove)
+        announcements = set( [ ann
+                               for idx,(ann,when) in self._announcements.items()
+                               if idx[1] == service_name] )
+        d = subscriber.callRemote("announce", announcements)
+        d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index e0903bf3..a891fbfc 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -13,6 +13,7 @@ from allmydata.introducer.client import IntroducerClient
 from allmydata.introducer.server import IntroducerService
 # test compatibility with old introducer .tac files
 from allmydata.introducer import IntroducerNode
+from allmydata.introducer import old
 from allmydata.util import testutil, idlib
 class FakeNode(Referenceable):
@@ -22,7 +23,7 @@ class LoggingMultiService(service.MultiService):
     def log(self, msg, **kw):
         log.msg(msg, **kw)
-class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
+class Node(testutil.SignalMixin, unittest.TestCase):
     def test_loadable(self):
         basedir = "introducer.IntroducerNode.test_loadable"
@@ -34,7 +35,7 @@ class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
         return d
-class TestIntroducer(unittest.TestCase, testutil.PollMixin):
+class ServiceMixin:
     def setUp(self):
         self.parent = LoggingMultiService()
@@ -45,6 +46,7 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
         return d
+class Introducer(ServiceMixin, unittest.TestCase, testutil.PollMixin):
     def test_create(self):
         ic = IntroducerClient(None, "introducer.furl", "my_nickname",
@@ -73,9 +75,10 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
         self.failUnlessEqual(len(i.get_announcements()), 2)
         self.failUnlessEqual(len(i.get_subscribers()), 0)
+class SystemTestMixin(ServiceMixin, testutil.PollMixin):
-    def test_system(self):
+    def setUp(self):
+        ServiceMixin.setUp(self)
         self.central_tub = tub = Tub()
         #tub.setOption("logLocalFailures", True)
         #tub.setOption("logRemoteFailures", True)
@@ -84,12 +87,25 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
         portnum = l.getPortnum()
         tub.setLocation("localhost:%d" % portnum)
+class SystemTest(SystemTestMixin, unittest.TestCase):
+    def test_system(self):
         i = IntroducerService()
-        introducer_furl = tub.registerReference(i)
+        self.introducer_furl = self.central_tub.registerReference(i)
+        return self.do_system_test()
+    def test_system_oldserver(self):
+        i = old.IntroducerService_V1()
+        i.setServiceParent(self.parent)
+        self.introducer_furl = self.central_tub.registerReference(i)
+        return self.do_system_test()
+    def do_system_test(self):
         NUMCLIENTS = 5
-        # we have 5 clients who publish themselves, and an extra one which
-        # does not. When the connections are fully established, all six nodes
+        # we have 5 clients who publish themselves, and an extra one does
+        # which not. When the connections are fully established, all six nodes
         # should have 5 connections each.
         clients = []
@@ -105,8 +121,11 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
             n = FakeNode()
             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
-            c = IntroducerClient(tub, introducer_furl,
-                                 "nickname-%d" % i, "version", "oldest")
+            client_class = IntroducerClient
+            if i == 0:
+                client_class = old.IntroducerClient_V1
+            c = client_class(tub, self.introducer_furl,
+                             "nickname-%d" % i, "version", "oldest")
             if i < NUMCLIENTS:
                 node_furl = tub.registerReference(n)
                 c.publish(node_furl, "storage", "ri_name")
@@ -207,4 +226,3 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
         return d