from allmydata.interfaces import RIIntroducer, RIIntroducerClient
from allmydata.util import idlib, observer
-def ignoreDeadRef(target, *args, **kwargs):
- from twisted.internet import error
- from foolscap import DeadReferenceError
- d = target.callRemote(*args, **kwargs)
- def _ignore(f):
- f.trap(error.ConnectionDone, error.ConnectError,
- error.ConnectionLost, DeadReferenceError)
- d.addErrback(_ignore)
-
class Introducer(service.MultiService, Referenceable):
implements(RIIntroducer)
log.msg(" introducer: removing %s %s" % (node, pburl))
self.nodes.remove(node)
self.pburls.remove(pburl)
- for othernode in self.nodes:
- #othernode.callRemote("lost_peers", set([pburl]))
- #othernode.callRemoteOnly("lost_peers", set([pburl]))
- ignoreDeadRef(othernode, "lost_peers", set([pburl]))
-
node.notifyOnDisconnect(_remove)
self.pburls.add(pburl)
node.callRemote("new_peers", self.pburls)
self.connections = {} # k: nodeid, v: ref
self.reconnectors = {} # k: PBURL, v: reconnector
- self.change_observers = observer.ObserverList()
+ self.connection_observers = observer.ObserverList()
def startService(self):
self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl,
for pburl in pburls:
self._new_peer(pburl)
- def remote_lost_peers(self, pburls):
- for pburl in pburls:
- self._lost_peer(pburl)
-
def stopService(self):
service.Service.stopService(self)
self.introducer_reconnector.stopConnecting()
for reconnector in self.reconnectors.itervalues():
reconnector.stopConnecting()
- self.reconnectors = {}
def _new_peer(self, pburl):
if pburl in self.reconnectors:
nodeid = idlib.a2b(m.group(1))
def _got_peer(rref):
self.log(" connected to(%s)" % idlib.b2a(nodeid))
- self.change_observers.notify("add", nodeid, rref)
+ self.connection_observers.notify(nodeid, rref)
self.connections[nodeid] = rref
def _lost():
# TODO: notifyOnDisconnect uses eventually(), but connects do
node=self,
pburl=self.my_pburl)
- def notify_on_change(self, cb):
- """Register a callback that will be fired (with ('add',nodeid,rref)
- or ('remove',pburl) ) when a new connection is established or a peer
- is lost. This is used by the unit tests."""
- self.change_observers.subscribe(cb)
+ def notify_on_new_connection(self, cb):
+ """Register a callback that will be fired (with nodeid, rref) when
+ a new connection is established."""
+ self.connection_observers.subscribe(cb)
- def _lost_peer(self, pburl):
- if pburl in self.reconnectors:
- self.reconnectors[pburl].stopConnecting()
- del self.reconnectors[pburl]
- self.change_observers.notify("remove", pburl)
- # TODO: we don't currently bother to terminate any connections we
- # might have to this peer. The assumption is that, since the
- # introducer lost their connection to this peer, we'll probably lose
- # our connection too. Also, foolscap doesn't currently provide a
- # clean way to terminate a given connection.
ic = IntroducerClient(None, "introducer", "mypburl")
def _ignore(nodeid, rref):
pass
- ic.notify_on_change(_ignore)
+ ic.notify_on_new_connection(_ignore)
def test_listen(self):
i = Introducer()
self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
d = self._done_counting = defer.Deferred()
- self._done_counting_down = defer.Deferred()
- self.waiting_for_disconnections = None
-
- def _count(changetype, *args):
- if changetype == "add":
- nodeid, rref = args
- log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
- self.waiting_for_connections -= 1
- if self.waiting_for_connections == 0:
- self._done_counting.callback("done!")
- else:
- pburl = args[0]
- log.msg("LOST PEER! %s" % (pburl,))
- if self.waiting_for_disconnections is not None:
- self.waiting_for_disconnections -= 1
- if self.waiting_for_disconnections == 0:
- self._done_counting_down.callback("done")
+ def _count(nodeid, rref):
+ log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
+ self.waiting_for_connections -= 1
+ if self.waiting_for_connections == 0:
+ self._done_counting.callback("done!")
clients = []
tubs = {}
n = MyNode()
node_pburl = tub.registerReference(n)
c = IntroducerClient(tub, iurl, node_pburl)
- c.notify_on_change(_count)
+ c.notify_on_new_connection(_count)
c.setServiceParent(self.parent)
clients.append(c)
tubs[c] = tub
log.msg("doing _check")
for c in clients:
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
- # now disconnect somebody's connection to someone else, and check
- # to see that the connection is reestablished
+ # now disconnect somebody's connection to someone else
self.waiting_for_connections = 2
d2 = self._done_counting = defer.Deferred()
origin_c = clients[0]
log.msg(" did disconnect")
return d2
d.addCallback(_check)
-
def _check_again(res):
log.msg("doing _check_again")
for c in clients:
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
- # now disconnect somebody's connection to themselves, and make
- # sure it reconnects. This will only result in one new
- # connection, since it is a loopback.
+ # now disconnect somebody's connection to themselves. This will
+ # only result in one new connection, since it is a loopback.
self.waiting_for_connections = 1
d2 = self._done_counting = defer.Deferred()
origin_c = clients[0]
log.msg(" did disconnect")
return d2
d.addCallback(_check_again)
-
def _check_again2(res):
log.msg("doing _check_again2")
for c in clients:
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+ # now disconnect somebody's connection to themselves
d.addCallback(_check_again2)
-
- def _shutdown_one_client(res):
- log.msg("_shutdown_one_client, waiting for %d shutdowns" %
- (NUMCLIENTS-1,))
- # shutdown a single client, make sure everyone else notices
- self.waiting_for_disconnections = NUMCLIENTS-1
- victim_client = clients[0]
- victim_tub = tubs[victim_client]
- # disownServiceParent will stop the service too
- d1 = defer.maybeDeferred(victim_client.disownServiceParent)
- def _stoptub(res):
- log.msg("_stoptub")
- return victim_tub.disownServiceParent()
- d1.addCallback(_stoptub)
- def _wait_for_counting_down(res):
- log.msg("_wait_for_counting_down")
- return self._done_counting_down
- d1.addCallback(_wait_for_counting_down)
- return d1
- d.addCallback(_shutdown_one_client)
-
- def _check_shutdown(res):
- log.msg("_check_shutdown")
- c = clients[1]
- self.failUnlessEqual(len(c.connections), NUMCLIENTS-1)
- self.failUnlessEqual(len(c.reconnectors), NUMCLIENTS-1)
- d.addCallback(_check_shutdown)
-
return d
test_system.timeout = 2400