]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
forget about old peers (closes #26)
authorBrian Warner <warner@allmydata.com>
Tue, 8 May 2007 02:10:24 +0000 (19:10 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 8 May 2007 02:10:24 +0000 (19:10 -0700)
Add a new method to RIIntroducer, to allow the central introducer node to
remove peers from the active set after they've gone away. Without this,
client nodes accumulate stale peer FURLs forever. This introduces a
compatibility break, as old introducers won't know about the 'lost_peers'
message, although the errors produced are probably harmless.

src/allmydata/interfaces.py
src/allmydata/introducer.py
src/allmydata/test/test_introducer.py

index 440bd7e6df27e41f0c438459caaa2c89de987951..1058a04930ee2a7aff44937806689ea615831e03 100644 (file)
@@ -19,6 +19,8 @@ ShareData = StringConstraint(100000)
 class RIIntroducerClient(RemoteInterface):
     def new_peers(pburls=SetOf(PBURL)):
         return None
+    def lost_peers(pburls=SetOf(PBURL)):
+        return None
 
 class RIIntroducer(RemoteInterface):
     def hello(node=RIIntroducerClient, pburl=PBURL):
index d2fe6c3c6669b2ef655bdd7441d29ba68cfb7d4d..7060cc467b9d8b01182249ee9f8b51173e667097 100644 (file)
@@ -7,6 +7,15 @@ from foolscap import Referenceable
 from allmydata.interfaces import RIIntroducer, RIIntroducerClient
 from allmydata.util import idlib, observer
 
+def ignoreDeadRef(target, *args, **kwargs):
+    from twisted.internet import error
+    from foolscap import DeadReferenceError
+    d = target.callRemote(*args, **kwargs)
+    def _ignore(f):
+        f.trap(error.ConnectionDone, error.ConnectError,
+               error.ConnectionLost, DeadReferenceError)
+    d.addErrback(_ignore)
+
 class Introducer(service.MultiService, Referenceable):
     implements(RIIntroducer)
 
@@ -21,6 +30,11 @@ class Introducer(service.MultiService, Referenceable):
             log.msg(" introducer: removing %s %s" % (node, pburl))
             self.nodes.remove(node)
             self.pburls.remove(pburl)
+            for othernode in self.nodes:
+                #othernode.callRemote("lost_peers", set([pburl]))
+                #othernode.callRemoteOnly("lost_peers", set([pburl]))
+                ignoreDeadRef(othernode, "lost_peers", set([pburl]))
+
         node.notifyOnDisconnect(_remove)
         self.pburls.add(pburl)
         node.callRemote("new_peers", self.pburls)
@@ -40,7 +54,7 @@ class IntroducerClient(service.Service, Referenceable):
         self.connections = {} # k: nodeid, v: ref
         self.reconnectors = {} # k: PBURL, v: reconnector
 
-        self.connection_observers = observer.ObserverList()
+        self.change_observers = observer.ObserverList()
 
     def startService(self):
         self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl,
@@ -53,11 +67,16 @@ class IntroducerClient(service.Service, Referenceable):
         for pburl in pburls:
             self._new_peer(pburl)
 
+    def remote_lost_peers(self, pburls):
+        for pburl in pburls:
+            self._lost_peer(pburl)
+
     def stopService(self):
         service.Service.stopService(self)
         self.introducer_reconnector.stopConnecting()
         for reconnector in self.reconnectors.itervalues():
             reconnector.stopConnecting()
+        self.reconnectors = {}
 
     def _new_peer(self, pburl):
         if pburl in self.reconnectors:
@@ -76,7 +95,7 @@ class IntroducerClient(service.Service, Referenceable):
         nodeid = idlib.a2b(m.group(1))
         def _got_peer(rref):
             self.log(" connected to(%s)" % idlib.b2a(nodeid))
-            self.connection_observers.notify(nodeid, rref)
+            self.change_observers.notify("add", nodeid, rref)
             self.connections[nodeid] = rref
             def _lost():
                 # TODO: notifyOnDisconnect uses eventually(), but connects do
@@ -92,8 +111,19 @@ class IntroducerClient(service.Service, Referenceable):
                              node=self,
                              pburl=self.my_pburl)
 
-    def notify_on_new_connection(self, cb):
-        """Register a callback that will be fired (with nodeid, rref) when
-        a new connection is established."""
-        self.connection_observers.subscribe(cb)
+    def notify_on_change(self, cb):
+        """Register a callback that will be fired (with ('add',nodeid,rref)
+        or ('remove',pburl) ) when a new connection is established or a peer
+        is lost. This is used by the unit tests."""
+        self.change_observers.subscribe(cb)
 
+    def _lost_peer(self, pburl):
+        if pburl in self.reconnectors:
+            self.reconnectors[pburl].stopConnecting()
+            del self.reconnectors[pburl]
+            self.change_observers.notify("remove", pburl)
+        # TODO: we don't currently bother to terminate any connections we
+        # might have to this peer. The assumption is that, since the
+        # introducer lost their connection to this peer, we'll probably lose
+        # our connection too. Also, foolscap doesn't currently provide a
+        # clean way to terminate a given connection.
index 057926a603d6b82af3537873680e079f94482f85..40d8527fbb78f2b5431ffe5bedd05ca561eb9f42 100644 (file)
@@ -48,7 +48,7 @@ class TestIntroducer(unittest.TestCase):
         ic = IntroducerClient(None, "introducer", "mypburl")
         def _ignore(nodeid, rref):
             pass
-        ic.notify_on_new_connection(_ignore)
+        ic.notify_on_change(_ignore)
 
     def test_listen(self):
         i = Introducer()
@@ -71,11 +71,23 @@ class TestIntroducer(unittest.TestCase):
 
         self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
         d = self._done_counting = defer.Deferred()
-        def _count(nodeid, rref):
-            log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
-            self.waiting_for_connections -= 1
-            if self.waiting_for_connections == 0:
-                self._done_counting.callback("done!")
+        self._done_counting_down = defer.Deferred()
+        self.waiting_for_disconnections = None
+
+        def _count(changetype, *args):
+            if changetype == "add":
+                nodeid, rref = args
+                log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
+                self.waiting_for_connections -= 1
+                if self.waiting_for_connections == 0:
+                    self._done_counting.callback("done!")
+            else:
+                pburl = args[0]
+                log.msg("LOST PEER! %s" % (pburl,))
+                if self.waiting_for_disconnections is not None:
+                    self.waiting_for_disconnections -= 1
+                    if self.waiting_for_disconnections == 0:
+                        self._done_counting_down.callback("done")
 
         clients = []
         tubs = {}
@@ -91,7 +103,7 @@ class TestIntroducer(unittest.TestCase):
             n = MyNode()
             node_pburl = tub.registerReference(n)
             c = IntroducerClient(tub, iurl, node_pburl)
-            c.notify_on_new_connection(_count)
+            c.notify_on_change(_count)
             c.setServiceParent(self.parent)
             clients.append(c)
             tubs[c] = tub
@@ -102,7 +114,8 @@ class TestIntroducer(unittest.TestCase):
             log.msg("doing _check")
             for c in clients:
                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
-            # now disconnect somebody's connection to someone else
+            # now disconnect somebody's connection to someone else, and check
+            # to see that the connection is reestablished
             self.waiting_for_connections = 2
             d2 = self._done_counting = defer.Deferred()
             origin_c = clients[0]
@@ -116,12 +129,14 @@ class TestIntroducer(unittest.TestCase):
             log.msg(" did disconnect")
             return d2
         d.addCallback(_check)
+
         def _check_again(res):
             log.msg("doing _check_again")
             for c in clients:
                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
-            # now disconnect somebody's connection to themselves. This will
-            # only result in one new connection, since it is a loopback.
+            # now disconnect somebody's connection to themselves, and make
+            # sure it reconnects. This will only result in one new
+            # connection, since it is a loopback.
             self.waiting_for_connections = 1
             d2 = self._done_counting = defer.Deferred()
             origin_c = clients[0]
@@ -135,12 +150,40 @@ class TestIntroducer(unittest.TestCase):
             log.msg(" did disconnect")
             return d2
         d.addCallback(_check_again)
+
         def _check_again2(res):
             log.msg("doing _check_again2")
             for c in clients:
                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
-            # now disconnect somebody's connection to themselves
         d.addCallback(_check_again2)
+
+        def _shutdown_one_client(res):
+            log.msg("_shutdown_one_client, waiting for %d shutdowns" %
+                    (NUMCLIENTS-1,))
+            # shutdown a single client, make sure everyone else notices
+            self.waiting_for_disconnections = NUMCLIENTS-1
+            victim_client = clients[0]
+            victim_tub = tubs[victim_client]
+            # disownServiceParent will stop the service too
+            d1 = defer.maybeDeferred(victim_client.disownServiceParent)
+            def _stoptub(res):
+                log.msg("_stoptub")
+                return victim_tub.disownServiceParent()
+            d1.addCallback(_stoptub)
+            def _wait_for_counting_down(res):
+                log.msg("_wait_for_counting_down")
+                return self._done_counting_down
+            d1.addCallback(_wait_for_counting_down)
+            return d1
+        d.addCallback(_shutdown_one_client)
+
+        def _check_shutdown(res):
+            log.msg("_check_shutdown")
+            c = clients[1]
+            self.failUnlessEqual(len(c.connections), NUMCLIENTS-1)
+            self.failUnlessEqual(len(c.reconnectors), NUMCLIENTS-1)
+        d.addCallback(_check_shutdown)
+
         return d
     test_system.timeout = 2400