From 04af4a48b551f2a8ab7fc508c39e9d9283fc72f8 Mon Sep 17 00:00:00 2001
From: Brian Warner <>
Date: Mon, 7 May 2007 19:10:24 -0700
Subject: [PATCH] forget about old peers (closes #26) Add a new method to
 RIIntroducer, to allow the central introducer node to remove peers from the
 active set after they've gone away. Without this, client nodes accumulate
 stale peer FURLs forever. This introduces a compatibility break, as old
 introducers won't know about the 'lost_peers' message, although the errors
 produced are probably harmless.

 src/allmydata/           |  2 +
 src/allmydata/           | 42 ++++++++++++++---
 src/allmydata/test/ | 65 ++++++++++++++++++++++-----
 3 files changed, 92 insertions(+), 17 deletions(-)

diff --git a/src/allmydata/ b/src/allmydata/
index 440bd7e6..1058a049 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -19,6 +19,8 @@ ShareData = StringConstraint(100000)
 class RIIntroducerClient(RemoteInterface):
     def new_peers(pburls=SetOf(PBURL)):
         return None
+    def lost_peers(pburls=SetOf(PBURL)):
+        return None
 class RIIntroducer(RemoteInterface):
     def hello(node=RIIntroducerClient, pburl=PBURL):
diff --git a/src/allmydata/ b/src/allmydata/
index d2fe6c3c..7060cc46 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -7,6 +7,15 @@ from foolscap import Referenceable
 from allmydata.interfaces import RIIntroducer, RIIntroducerClient
 from allmydata.util import idlib, observer
+def ignoreDeadRef(target, *args, **kwargs):
+    from twisted.internet import error
+    from foolscap import DeadReferenceError
+    d = target.callRemote(*args, **kwargs)
+    def _ignore(f):
+        f.trap(error.ConnectionDone, error.ConnectError,
+               error.ConnectionLost, DeadReferenceError)
+    d.addErrback(_ignore)
 class Introducer(service.MultiService, Referenceable):
@@ -21,6 +30,11 @@ class Introducer(service.MultiService, Referenceable):
             log.msg(" introducer: removing %s %s" % (node, pburl))
+            for othernode in self.nodes:
+                #othernode.callRemote("lost_peers", set([pburl]))
+                #othernode.callRemoteOnly("lost_peers", set([pburl]))
+                ignoreDeadRef(othernode, "lost_peers", set([pburl]))
         node.callRemote("new_peers", self.pburls)
@@ -40,7 +54,7 @@ class IntroducerClient(service.Service, Referenceable):
         self.connections = {} # k: nodeid, v: ref
         self.reconnectors = {} # k: PBURL, v: reconnector
-        self.connection_observers = observer.ObserverList()
+        self.change_observers = observer.ObserverList()
     def startService(self):
         self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl,
@@ -53,11 +67,16 @@ class IntroducerClient(service.Service, Referenceable):
         for pburl in pburls:
+    def remote_lost_peers(self, pburls):
+        for pburl in pburls:
+            self._lost_peer(pburl)
     def stopService(self):
         for reconnector in self.reconnectors.itervalues():
+        self.reconnectors = {}
     def _new_peer(self, pburl):
         if pburl in self.reconnectors:
@@ -76,7 +95,7 @@ class IntroducerClient(service.Service, Referenceable):
         nodeid = idlib.a2b(
         def _got_peer(rref):
             self.log(" connected to(%s)" % idlib.b2a(nodeid))
-            self.connection_observers.notify(nodeid, rref)
+            self.change_observers.notify("add", nodeid, rref)
             self.connections[nodeid] = rref
             def _lost():
                 # TODO: notifyOnDisconnect uses eventually(), but connects do
@@ -92,8 +111,19 @@ class IntroducerClient(service.Service, Referenceable):
-    def notify_on_new_connection(self, cb):
-        """Register a callback that will be fired (with nodeid, rref) when
-        a new connection is established."""
-        self.connection_observers.subscribe(cb)
+    def notify_on_change(self, cb):
+        """Register a callback that will be fired (with ('add',nodeid,rref)
+        or ('remove',pburl) ) when a new connection is established or a peer
+        is lost. This is used by the unit tests."""
+        self.change_observers.subscribe(cb)
+    def _lost_peer(self, pburl):
+        if pburl in self.reconnectors:
+            self.reconnectors[pburl].stopConnecting()
+            del self.reconnectors[pburl]
+            self.change_observers.notify("remove", pburl)
+        # TODO: we don't currently bother to terminate any connections we
+        # might have to this peer. The assumption is that, since the
+        # introducer lost their connection to this peer, we'll probably lose
+        # our connection too. Also, foolscap doesn't currently provide a
+        # clean way to terminate a given connection.
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index 057926a6..40d8527f 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -48,7 +48,7 @@ class TestIntroducer(unittest.TestCase):
         ic = IntroducerClient(None, "introducer", "mypburl")
         def _ignore(nodeid, rref):
-        ic.notify_on_new_connection(_ignore)
+        ic.notify_on_change(_ignore)
     def test_listen(self):
         i = Introducer()
@@ -71,11 +71,23 @@ class TestIntroducer(unittest.TestCase):
         self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
         d = self._done_counting = defer.Deferred()
-        def _count(nodeid, rref):
-            log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
-            self.waiting_for_connections -= 1
-            if self.waiting_for_connections == 0:
-                self._done_counting.callback("done!")
+        self._done_counting_down = defer.Deferred()
+        self.waiting_for_disconnections = None
+        def _count(changetype, *args):
+            if changetype == "add":
+                nodeid, rref = args
+                log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
+                self.waiting_for_connections -= 1
+                if self.waiting_for_connections == 0:
+                    self._done_counting.callback("done!")
+            else:
+                pburl = args[0]
+                log.msg("LOST PEER! %s" % (pburl,))
+                if self.waiting_for_disconnections is not None:
+                    self.waiting_for_disconnections -= 1
+                    if self.waiting_for_disconnections == 0:
+                        self._done_counting_down.callback("done")
         clients = []
         tubs = {}
@@ -91,7 +103,7 @@ class TestIntroducer(unittest.TestCase):
             n = MyNode()
             node_pburl = tub.registerReference(n)
             c = IntroducerClient(tub, iurl, node_pburl)
-            c.notify_on_new_connection(_count)
+            c.notify_on_change(_count)
             tubs[c] = tub
@@ -102,7 +114,8 @@ class TestIntroducer(unittest.TestCase):
             log.msg("doing _check")
             for c in clients:
                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
-            # now disconnect somebody's connection to someone else
+            # now disconnect somebody's connection to someone else, and check
+            # to see that the connection is reestablished
             self.waiting_for_connections = 2
             d2 = self._done_counting = defer.Deferred()
             origin_c = clients[0]
@@ -116,12 +129,14 @@ class TestIntroducer(unittest.TestCase):
             log.msg(" did disconnect")
             return d2
         def _check_again(res):
             log.msg("doing _check_again")
             for c in clients:
                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
-            # now disconnect somebody's connection to themselves. This will
-            # only result in one new connection, since it is a loopback.
+            # now disconnect somebody's connection to themselves, and make
+            # sure it reconnects. This will only result in one new
+            # connection, since it is a loopback.
             self.waiting_for_connections = 1
             d2 = self._done_counting = defer.Deferred()
             origin_c = clients[0]
@@ -135,12 +150,40 @@ class TestIntroducer(unittest.TestCase):
             log.msg(" did disconnect")
             return d2
         def _check_again2(res):
             log.msg("doing _check_again2")
             for c in clients:
                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
-            # now disconnect somebody's connection to themselves
+        def _shutdown_one_client(res):
+            log.msg("_shutdown_one_client, waiting for %d shutdowns" %
+                    (NUMCLIENTS-1,))
+            # shutdown a single client, make sure everyone else notices
+            self.waiting_for_disconnections = NUMCLIENTS-1
+            victim_client = clients[0]
+            victim_tub = tubs[victim_client]
+            # disownServiceParent will stop the service too
+            d1 = defer.maybeDeferred(victim_client.disownServiceParent)
+            def _stoptub(res):
+                log.msg("_stoptub")
+                return victim_tub.disownServiceParent()
+            d1.addCallback(_stoptub)
+            def _wait_for_counting_down(res):
+                log.msg("_wait_for_counting_down")
+                return self._done_counting_down
+            d1.addCallback(_wait_for_counting_down)
+            return d1
+        d.addCallback(_shutdown_one_client)
+        def _check_shutdown(res):
+            log.msg("_check_shutdown")
+            c = clients[1]
+            self.failUnlessEqual(len(c.connections), NUMCLIENTS-1)
+            self.failUnlessEqual(len(c.reconnectors), NUMCLIENTS-1)
+        d.addCallback(_check_shutdown)
         return d
     test_system.timeout = 2400