From: Brian Warner Date: Fri, 1 Dec 2006 01:09:57 +0000 (-0700) Subject: implement more Roster stuff: add_peer, lost_peer. Changed Client service registration... X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~530 X-Git-Url: https://git.rkrishnan.org/vdrive//%22file:/%22?a=commitdiff_plain;h=646f888bbd351b8c09069436019d2ed2b34fdf2c;p=tahoe-lafs%2Ftahoe-lafs.git implement more Roster stuff: add_peer, lost_peer. Changed Client service registration scheme. --- diff --git a/allmydata/client.py b/allmydata/client.py index c60042a1..908a2e31 100644 --- a/allmydata/client.py +++ b/allmydata/client.py @@ -10,6 +10,7 @@ from twisted.internet.base import BlockingResolver reactor.installResolver(BlockingResolver()) class Storage(service.MultiService, Referenceable): + name = "storage" pass class Client(service.MultiService, Referenceable): @@ -28,27 +29,30 @@ class Client(service.MultiService, Referenceable): f.close() self.tub.setServiceParent(self) self.queen = None # self.queen is either None or a RemoteReference - self.urls = {} + self.all_peers = set() + self.connections = {} + s = Storage() + s.setServiceParent(self) + if os.path.exists(self.AUTHKEYSFILE): from allmydata import manhole m = manhole.AuthorizedKeysManhole(8022, self.AUTHKEYSFILE) m.setServiceParent(self) log.msg("AuthorizedKeysManhole listening on 8022") - def _setup_services(self, local_ip): + def _setup_tub(self, local_ip): portnum = 0 l = self.tub.listenOn("tcp:%d" % portnum) self.tub.setLocation("%s:%d" % (local_ip, l.getPortnum())) - s = Storage() - s.setServiceParent(self) - self.urls["storage"] = self.tub.registerReference(s, "storage") + self.my_pburl = self.tub.registerReference(self) def startService(self): # note: this class can only be started and stopped once. service.MultiService.startService(self) d = get_local_ip_for() - d.addCallback(self._setup_services) + d.addCallback(self._setup_tub) if self.queen_pburl: + # TODO: maybe this should wait for tub.setLocation ? self.connector = self.tub.connectTo(self.queen_pburl, self._got_queen) else: @@ -63,8 +67,37 @@ class Client(service.MultiService, Referenceable): log.msg("connected to queen") self.queen = queen queen.notifyOnDisconnect(self._lost_queen) - queen.callRemote("hello", nodeid=self.tub.tubID, self=self, urls=self.urls) + queen.callRemote("hello", + nodeid=self.tub.tubID, node=self, pburl=self.my_pburl) def _lost_queen(self): log.msg("lost connection to queen") self.queen = None + + def remote_get_service(self, name): + return self.getServiceNamed(name) + + def remote_add_peers(self, new_peers): + for nodeid, pburl in new_peers: + if nodeid == self.tub.tubID: + continue + log.msg("adding peer %s" % nodeid) + if nodeid in self.all_peers: + log.msg("weird, I already had an entry for them") + self.all_peers.add(nodeid) + if nodeid not in self.connections: + d = self.tub.getReference(pburl) + def _got_reference(ref): + if nodeid in self.all_peers: + self.connections[nodeid] = ref + d.addCallback(_got_reference) + + def remote_lost_peers(self, lost_peers): + for nodeid in lost_peers: + log.msg("lost peer %s" % nodeid) + if nodeid in self.all_peers: + del self.all_peers[nodeid] + else: + log.msg("weird, I didn't have an entry for them") + if nodeid in self.connections: + del self.connections[nodeid] diff --git a/allmydata/queen.py b/allmydata/queen.py index 1c7158a0..849425d5 100644 --- a/allmydata/queen.py +++ b/allmydata/queen.py @@ -1,5 +1,6 @@ from foolscap import Tub, Referenceable +from foolscap.eventual import eventually from twisted.application import service from twisted.python import log import os.path @@ -8,16 +9,35 @@ from allmydata.util.iputil import get_local_ip_for class Roster(service.MultiService, Referenceable): def __init__(self): service.MultiService.__init__(self) - self.active_peers = {} + self.phonebook = {} + self.connections = {} - def remote_hello(self, nodeid, node, urls): + def remote_hello(self, nodeid, node, pburl): log.msg("contact from %s" % nodeid) - self.active_peers[nodeid] = urls + eventually(self._educate_the_new_peer, node) + eventually(self._announce_new_peer, nodeid, pburl) + self.phonebook[nodeid] = pburl + self.connections[nodeid] = node node.notifyOnDisconnect(self._lost_node, nodeid) + def _educate_the_new_peer(self, node): + node.callRemote("add_peers", new_peers=list(self.phonebook.items())) + + def _announce_new_peer(self, new_nodeid, new_node_pburl): + for targetnode in self.connections.values(): + targetnode.callRemote("add_peers", + new_peers=[(new_nodeid, new_node_pburl)]) + def _lost_node(self, nodeid): log.msg("lost contact with %s" % nodeid) - del self.active_peers[nodeid] + del self.phonebook[nodeid] + del self.connections[nodeid] + eventually(self._announce_lost_peer, nodeid) + + def _announce_lost_peer(self, lost_nodeid): + for targetnode in self.connections.values(): + targetnode.callRemote("lost_peers", lost_peers=[lost_nodeid]) + class Queen(service.MultiService):