From 04af4a48b551f2a8ab7fc508c39e9d9283fc72f8 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 7 May 2007 19:10:24 -0700 Subject: [PATCH] forget about old peers (closes #26) Add a new method to RIIntroducer, to allow the central introducer node to remove peers from the active set after they've gone away. Without this, client nodes accumulate stale peer FURLs forever. This introduces a compatibility break, as old introducers won't know about the 'lost_peers' message, although the errors produced are probably harmless. --- src/allmydata/interfaces.py | 2 + src/allmydata/introducer.py | 42 ++++++++++++++--- src/allmydata/test/test_introducer.py | 65 ++++++++++++++++++++++----- 3 files changed, 92 insertions(+), 17 deletions(-) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 440bd7e6..1058a049 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -19,6 +19,8 @@ ShareData = StringConstraint(100000) class RIIntroducerClient(RemoteInterface): def new_peers(pburls=SetOf(PBURL)): return None + def lost_peers(pburls=SetOf(PBURL)): + return None class RIIntroducer(RemoteInterface): def hello(node=RIIntroducerClient, pburl=PBURL): diff --git a/src/allmydata/introducer.py b/src/allmydata/introducer.py index d2fe6c3c..7060cc46 100644 --- a/src/allmydata/introducer.py +++ b/src/allmydata/introducer.py @@ -7,6 +7,15 @@ from foolscap import Referenceable from allmydata.interfaces import RIIntroducer, RIIntroducerClient from allmydata.util import idlib, observer +def ignoreDeadRef(target, *args, **kwargs): + from twisted.internet import error + from foolscap import DeadReferenceError + d = target.callRemote(*args, **kwargs) + def _ignore(f): + f.trap(error.ConnectionDone, error.ConnectError, + error.ConnectionLost, DeadReferenceError) + d.addErrback(_ignore) + class Introducer(service.MultiService, Referenceable): implements(RIIntroducer) @@ -21,6 +30,11 @@ class Introducer(service.MultiService, Referenceable): log.msg(" introducer: removing %s %s" % (node, pburl)) self.nodes.remove(node) self.pburls.remove(pburl) + for othernode in self.nodes: + #othernode.callRemote("lost_peers", set([pburl])) + #othernode.callRemoteOnly("lost_peers", set([pburl])) + ignoreDeadRef(othernode, "lost_peers", set([pburl])) + node.notifyOnDisconnect(_remove) self.pburls.add(pburl) node.callRemote("new_peers", self.pburls) @@ -40,7 +54,7 @@ class IntroducerClient(service.Service, Referenceable): self.connections = {} # k: nodeid, v: ref self.reconnectors = {} # k: PBURL, v: reconnector - self.connection_observers = observer.ObserverList() + self.change_observers = observer.ObserverList() def startService(self): self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl, @@ -53,11 +67,16 @@ class IntroducerClient(service.Service, Referenceable): for pburl in pburls: self._new_peer(pburl) + def remote_lost_peers(self, pburls): + for pburl in pburls: + self._lost_peer(pburl) + def stopService(self): service.Service.stopService(self) self.introducer_reconnector.stopConnecting() for reconnector in self.reconnectors.itervalues(): reconnector.stopConnecting() + self.reconnectors = {} def _new_peer(self, pburl): if pburl in self.reconnectors: @@ -76,7 +95,7 @@ class IntroducerClient(service.Service, Referenceable): nodeid = idlib.a2b(m.group(1)) def _got_peer(rref): self.log(" connected to(%s)" % idlib.b2a(nodeid)) - self.connection_observers.notify(nodeid, rref) + self.change_observers.notify("add", nodeid, rref) self.connections[nodeid] = rref def _lost(): # TODO: notifyOnDisconnect uses eventually(), but connects do @@ -92,8 +111,19 @@ class IntroducerClient(service.Service, Referenceable): node=self, pburl=self.my_pburl) - def notify_on_new_connection(self, cb): - """Register a callback that will be fired (with nodeid, rref) when - a new connection is established.""" - self.connection_observers.subscribe(cb) + def notify_on_change(self, cb): + """Register a callback that will be fired (with ('add',nodeid,rref) + or ('remove',pburl) ) when a new connection is established or a peer + is lost. This is used by the unit tests.""" + self.change_observers.subscribe(cb) + def _lost_peer(self, pburl): + if pburl in self.reconnectors: + self.reconnectors[pburl].stopConnecting() + del self.reconnectors[pburl] + self.change_observers.notify("remove", pburl) + # TODO: we don't currently bother to terminate any connections we + # might have to this peer. The assumption is that, since the + # introducer lost their connection to this peer, we'll probably lose + # our connection too. Also, foolscap doesn't currently provide a + # clean way to terminate a given connection. diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 057926a6..40d8527f 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -48,7 +48,7 @@ class TestIntroducer(unittest.TestCase): ic = IntroducerClient(None, "introducer", "mypburl") def _ignore(nodeid, rref): pass - ic.notify_on_new_connection(_ignore) + ic.notify_on_change(_ignore) def test_listen(self): i = Introducer() @@ -71,11 +71,23 @@ class TestIntroducer(unittest.TestCase): self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS d = self._done_counting = defer.Deferred() - def _count(nodeid, rref): - log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref)) - self.waiting_for_connections -= 1 - if self.waiting_for_connections == 0: - self._done_counting.callback("done!") + self._done_counting_down = defer.Deferred() + self.waiting_for_disconnections = None + + def _count(changetype, *args): + if changetype == "add": + nodeid, rref = args + log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref)) + self.waiting_for_connections -= 1 + if self.waiting_for_connections == 0: + self._done_counting.callback("done!") + else: + pburl = args[0] + log.msg("LOST PEER! %s" % (pburl,)) + if self.waiting_for_disconnections is not None: + self.waiting_for_disconnections -= 1 + if self.waiting_for_disconnections == 0: + self._done_counting_down.callback("done") clients = [] tubs = {} @@ -91,7 +103,7 @@ class TestIntroducer(unittest.TestCase): n = MyNode() node_pburl = tub.registerReference(n) c = IntroducerClient(tub, iurl, node_pburl) - c.notify_on_new_connection(_count) + c.notify_on_change(_count) c.setServiceParent(self.parent) clients.append(c) tubs[c] = tub @@ -102,7 +114,8 @@ class TestIntroducer(unittest.TestCase): log.msg("doing _check") for c in clients: self.failUnlessEqual(len(c.connections), NUMCLIENTS) - # now disconnect somebody's connection to someone else + # now disconnect somebody's connection to someone else, and check + # to see that the connection is reestablished self.waiting_for_connections = 2 d2 = self._done_counting = defer.Deferred() origin_c = clients[0] @@ -116,12 +129,14 @@ class TestIntroducer(unittest.TestCase): log.msg(" did disconnect") return d2 d.addCallback(_check) + def _check_again(res): log.msg("doing _check_again") for c in clients: self.failUnlessEqual(len(c.connections), NUMCLIENTS) - # now disconnect somebody's connection to themselves. This will - # only result in one new connection, since it is a loopback. + # now disconnect somebody's connection to themselves, and make + # sure it reconnects. This will only result in one new + # connection, since it is a loopback. self.waiting_for_connections = 1 d2 = self._done_counting = defer.Deferred() origin_c = clients[0] @@ -135,12 +150,40 @@ class TestIntroducer(unittest.TestCase): log.msg(" did disconnect") return d2 d.addCallback(_check_again) + def _check_again2(res): log.msg("doing _check_again2") for c in clients: self.failUnlessEqual(len(c.connections), NUMCLIENTS) - # now disconnect somebody's connection to themselves d.addCallback(_check_again2) + + def _shutdown_one_client(res): + log.msg("_shutdown_one_client, waiting for %d shutdowns" % + (NUMCLIENTS-1,)) + # shutdown a single client, make sure everyone else notices + self.waiting_for_disconnections = NUMCLIENTS-1 + victim_client = clients[0] + victim_tub = tubs[victim_client] + # disownServiceParent will stop the service too + d1 = defer.maybeDeferred(victim_client.disownServiceParent) + def _stoptub(res): + log.msg("_stoptub") + return victim_tub.disownServiceParent() + d1.addCallback(_stoptub) + def _wait_for_counting_down(res): + log.msg("_wait_for_counting_down") + return self._done_counting_down + d1.addCallback(_wait_for_counting_down) + return d1 + d.addCallback(_shutdown_one_client) + + def _check_shutdown(res): + log.msg("_check_shutdown") + c = clients[1] + self.failUnlessEqual(len(c.connections), NUMCLIENTS-1) + self.failUnlessEqual(len(c.reconnectors), NUMCLIENTS-1) + d.addCallback(_check_shutdown) + return d test_system.timeout = 2400 -- 2.45.2