from allmydata.interfaces import RIIntroducer, RIIntroducerClient
from allmydata.util import idlib, observer
+def ignoreDeadRef(target, *args, **kwargs):
+ from twisted.internet import error
+ from foolscap import DeadReferenceError
+ d = target.callRemote(*args, **kwargs)
+ def _ignore(f):
+ f.trap(error.ConnectionDone, error.ConnectError,
+ error.ConnectionLost, DeadReferenceError)
+ d.addErrback(_ignore)
+
class Introducer(service.MultiService, Referenceable):
implements(RIIntroducer)
log.msg(" introducer: removing %s %s" % (node, pburl))
self.nodes.remove(node)
self.pburls.remove(pburl)
+ for othernode in self.nodes:
+ #othernode.callRemote("lost_peers", set([pburl]))
+ #othernode.callRemoteOnly("lost_peers", set([pburl]))
+ ignoreDeadRef(othernode, "lost_peers", set([pburl]))
+
node.notifyOnDisconnect(_remove)
self.pburls.add(pburl)
node.callRemote("new_peers", self.pburls)
self.connections = {} # k: nodeid, v: ref
self.reconnectors = {} # k: PBURL, v: reconnector
- self.connection_observers = observer.ObserverList()
+ self.change_observers = observer.ObserverList()
def startService(self):
self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl,
for pburl in pburls:
self._new_peer(pburl)
+ def remote_lost_peers(self, pburls):
+ for pburl in pburls:
+ self._lost_peer(pburl)
+
def stopService(self):
service.Service.stopService(self)
self.introducer_reconnector.stopConnecting()
for reconnector in self.reconnectors.itervalues():
reconnector.stopConnecting()
+ self.reconnectors = {}
def _new_peer(self, pburl):
if pburl in self.reconnectors:
nodeid = idlib.a2b(m.group(1))
def _got_peer(rref):
self.log(" connected to(%s)" % idlib.b2a(nodeid))
- self.connection_observers.notify(nodeid, rref)
+ self.change_observers.notify("add", nodeid, rref)
self.connections[nodeid] = rref
def _lost():
# TODO: notifyOnDisconnect uses eventually(), but connects do
node=self,
pburl=self.my_pburl)
- def notify_on_new_connection(self, cb):
- """Register a callback that will be fired (with nodeid, rref) when
- a new connection is established."""
- self.connection_observers.subscribe(cb)
+ def notify_on_change(self, cb):
+ """Register a callback that will be fired (with ('add',nodeid,rref)
+ or ('remove',pburl) ) when a new connection is established or a peer
+ is lost. This is used by the unit tests."""
+ self.change_observers.subscribe(cb)
+ def _lost_peer(self, pburl):
+ if pburl in self.reconnectors:
+ self.reconnectors[pburl].stopConnecting()
+ del self.reconnectors[pburl]
+ self.change_observers.notify("remove", pburl)
+ # TODO: we don't currently bother to terminate any connections we
+ # might have to this peer. The assumption is that, since the
+ # introducer lost their connection to this peer, we'll probably lose
+ # our connection too. Also, foolscap doesn't currently provide a
+ # clean way to terminate a given connection.
ic = IntroducerClient(None, "introducer", "mypburl")
def _ignore(nodeid, rref):
pass
- ic.notify_on_new_connection(_ignore)
+ ic.notify_on_change(_ignore)
def test_listen(self):
i = Introducer()
self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
d = self._done_counting = defer.Deferred()
- def _count(nodeid, rref):
- log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
- self.waiting_for_connections -= 1
- if self.waiting_for_connections == 0:
- self._done_counting.callback("done!")
+ self._done_counting_down = defer.Deferred()
+ self.waiting_for_disconnections = None
+
+ def _count(changetype, *args):
+ if changetype == "add":
+ nodeid, rref = args
+ log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
+ self.waiting_for_connections -= 1
+ if self.waiting_for_connections == 0:
+ self._done_counting.callback("done!")
+ else:
+ pburl = args[0]
+ log.msg("LOST PEER! %s" % (pburl,))
+ if self.waiting_for_disconnections is not None:
+ self.waiting_for_disconnections -= 1
+ if self.waiting_for_disconnections == 0:
+ self._done_counting_down.callback("done")
clients = []
tubs = {}
n = MyNode()
node_pburl = tub.registerReference(n)
c = IntroducerClient(tub, iurl, node_pburl)
- c.notify_on_new_connection(_count)
+ c.notify_on_change(_count)
c.setServiceParent(self.parent)
clients.append(c)
tubs[c] = tub
log.msg("doing _check")
for c in clients:
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
- # now disconnect somebody's connection to someone else
+ # now disconnect somebody's connection to someone else, and check
+ # to see that the connection is reestablished
self.waiting_for_connections = 2
d2 = self._done_counting = defer.Deferred()
origin_c = clients[0]
log.msg(" did disconnect")
return d2
d.addCallback(_check)
+
def _check_again(res):
log.msg("doing _check_again")
for c in clients:
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
- # now disconnect somebody's connection to themselves. This will
- # only result in one new connection, since it is a loopback.
+ # now disconnect somebody's connection to themselves, and make
+ # sure it reconnects. This will only result in one new
+ # connection, since it is a loopback.
self.waiting_for_connections = 1
d2 = self._done_counting = defer.Deferred()
origin_c = clients[0]
log.msg(" did disconnect")
return d2
d.addCallback(_check_again)
+
def _check_again2(res):
log.msg("doing _check_again2")
for c in clients:
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
- # now disconnect somebody's connection to themselves
d.addCallback(_check_again2)
+
+ def _shutdown_one_client(res):
+ log.msg("_shutdown_one_client, waiting for %d shutdowns" %
+ (NUMCLIENTS-1,))
+ # shutdown a single client, make sure everyone else notices
+ self.waiting_for_disconnections = NUMCLIENTS-1
+ victim_client = clients[0]
+ victim_tub = tubs[victim_client]
+ # disownServiceParent will stop the service too
+ d1 = defer.maybeDeferred(victim_client.disownServiceParent)
+ def _stoptub(res):
+ log.msg("_stoptub")
+ return victim_tub.disownServiceParent()
+ d1.addCallback(_stoptub)
+ def _wait_for_counting_down(res):
+ log.msg("_wait_for_counting_down")
+ return self._done_counting_down
+ d1.addCallback(_wait_for_counting_down)
+ return d1
+ d.addCallback(_shutdown_one_client)
+
+ def _check_shutdown(res):
+ log.msg("_check_shutdown")
+ c = clients[1]
+ self.failUnlessEqual(len(c.connections), NUMCLIENTS-1)
+ self.failUnlessEqual(len(c.reconnectors), NUMCLIENTS-1)
+ d.addCallback(_check_shutdown)
+
return d
test_system.timeout = 2400