def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
self.queen = None # self.queen is either None or a RemoteReference
- self.my_pburl = None
self.all_peers = set()
self.peer_pburls = {}
self.connections = {}
def tub_ready(self):
self.my_pburl = self.tub.registerReference(self)
- self.register_control()
self.maybe_connect_to_queen()
def set_queen_pburl(self, queen_pburl):
self.queen_connector = self.tub.connectTo(self.queen_pburl,
self._got_queen)
- def register_control(self):
- c = ControlServer()
- c.setServiceParent(self)
- control_url = self.tub.registerReference(c)
- f = open("control.pburl", "w")
- f.write(control_url + "\n")
- f.close()
- os.chmod("control.pburl", 0600)
-
def stopService(self):
- if self.queen_connector:
- self.queen_connector.stopConnecting()
- self.queen_connector = None
+ if self.introducer_client:
+ self.introducer_client.stop()
return service.MultiService.stopService(self)
def _got_queen(self, queen):
self.log("connected to queen")
- self.queen = queen
- queen.notifyOnDisconnect(self._lost_queen)
- d = queen.callRemote("hello",
- nodeid=self.nodeid,
- node=self,
- pburl=self.my_pburl)
d.addCallback(lambda x: queen.callRemote("get_global_vdrive"))
d.addCallback(self._got_vdrive_root)
if "webish" in self.namedServices:
self.getServiceNamed("webish").set_root_dirnode(root)
- def _lost_queen(self):
- self.log("lost connection to queen")
- self.queen = None
-
def remote_get_service(self, name):
# TODO: 'vdrive' should not be public in the medium term
return self.getServiceNamed(name)
- def remote_add_peers(self, new_peers):
- for nodeid, pburl in new_peers:
- self.log("adding peer %s" % idlib.b2a(nodeid))
- if nodeid in self.all_peers:
- self.log("weird, I already had an entry for them")
- return
- self.all_peers.add(nodeid)
- self.peer_pburls[nodeid] = pburl
- if nodeid not in self.connections:
- d = self.tub.getReference(pburl)
- def _got_reference(ref, which_nodeid):
- self.log("connected to %s" % idlib.b2a(which_nodeid))
- if which_nodeid in self.all_peers:
- self.connections[which_nodeid] = ref
- else:
- log.msg(" ignoring it because we no longer want to talk to them")
- d.addCallback(_got_reference, nodeid)
-
- def remote_lost_peers(self, lost_peers):
- for nodeid in lost_peers:
- self.log("lost peer %s" % idlib.b2a(nodeid))
- if nodeid in self.all_peers:
- self.all_peers.remove(nodeid)
- else:
- self.log("weird, I didn't have an entry for them")
- if nodeid in self.peer_pburls:
- del self.peer_pburls[nodeid]
- if nodeid in self.connections:
- del self.connections[nodeid]
-
def get_remote_service(self, nodeid, servicename):
if nodeid not in self.connections:
return defer.fail(IndexError("no connection to that peer"))
from zope.interface import Interface
-from foolscap.schema import StringConstraint, ListOf, TupleOf, Any, DictOf
+from foolscap.schema import StringConstraint, ListOf, TupleOf, Any
from foolscap import RemoteInterface
Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash
Verifierid = StringConstraint(20)
URI = StringConstraint(100) # kind of arbitrary
ShareData = StringConstraint(100000)
-# these four are here because Foolscap does not yet support the kind of
+# these six are here because Foolscap does not yet support the kind of
# restriction I really want to apply to these.
RIClient_ = Any()
Referenceable_ = Any()
RIMutableDirectoryNode_ = Any()
RIMutableFileNode_ = Any()
-class RIQueenRoster(RemoteInterface):
- def hello(nodeid=Nodeid, node=RIClient_, pburl=PBURL):
+class RIIntroducerClient(RemoteInterface):
+ def new_peers(pburls=SetOf(PBURL)):
+ return None
+
+class RIIntroducer(RemoteInterface):
+ def hello(node=RIIntroducerClient, pburl=PBURL):
return None
+class RIQueenRoster(RemoteInterface):
def get_global_vdrive():
return RIMutableDirectoryNode_ # the virtual drive root
-
-class RIClient(RemoteInterface):
+class RIClient(RIIntroducerClient):
def get_service(name=str):
return Referenceable_
- def add_peers(new_peers=ListOf(TupleOf(Nodeid, PBURL), maxLength=100)):
- return None
- def lost_peers(lost_peers=ListOf(Nodeid)):
- return None
+ def get_nodeid():
+ return Nodeid
class RIStorageServer(RemoteInterface):
def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int,
--- /dev/null
+from foolscap import Referenceable, DeadReferenceError
+from twisted.application import service
+from twisted.python import log
+from twisted.internet.error import ConnectionLost, ConnectionDone
+from zope.interface import implements
+from allmydata.interfaces import RIIntroducer
+
+
+def sendOnly(call, methname, *args, **kwargs):
+ d = call(methname, *args, **kwargs)
+ def _trap(f):
+ f.trap(DeadReferenceError, ConnectionLost, ConnectionDone)
+ d.addErrback(_trap)
+
+class Introducer(service.MultiService, Referenceable):
+ implements(RIIntroducer)
+
+ def __init__(self):
+ service.MultiService.__init__(self)
+ self.nodes = set()
+ self.pburls = set()
+
+ def remote_hello(self, node, pburl):
+ log.msg("roster: new contact at %s, node is %s" % (pburl, node))
+ def _remove():
+ log.msg(" roster: removing %s %s" % (node, pburl))
+ self.nodes.remove(node)
+ self.pburls.remove(pburl)
+ node.notifyOnDisconnect(_remove)
+ self.pburls.add(pburl)
+ node.callRemote("new_peers", self.pburls)
+ for othernode in self.nodes:
+ othernode.callRemote("new_peers", set([pburl]))
+ self.nodes.add(node)
--- /dev/null
+class IntroducerClient(Referenceable):
+ implements(RIIntroducerClient)
+
+ def __init__(self, tub, introducer_pburl, my_pburl):
+ self.introducer_reconnector = self.tub.connectTo(introducer_pburl,
+ self._got_introducer)
+
+ self.tub = tub
+ self.my_pburl = my_pburl
+
+ self.connections = {} # k: nodeid, v: ref
+ self.reconnectors = {} # k: PBURL, v: reconnector
+
+ def remote_get_nodeid(self):
+ return self.nodeid
+
+ def remote_new_peers(self, pburls):
+ for pburl in pburls:
+ self._new_peer(pburl)
+
+ def stop(self):
+ self.introducer_reconnector.stopConnecting()
+ for reconnector in self.reconnectors.itervalues():
+ reconnector.stopConnecting()
+
+ def _new_peer(self, pburl):
+ if pburl in self.reconnectors:
+ return
+ def _got_peer(rref):
+ d2 = rref.callRemote("get_nodeid")
+ def _got_nodeid(nodeid):
+ self.connections[nodeid] = rref
+ def _lost():
+ # TODO: notifyOnDisconnect uses eventually(), but connects do not. Could this cause a problem?
+ del self.connections[nodeid]
+ rref.notifyOnDisconnect(_lost)
+ d2.addCallback(_got_nodeid)
+ log.msg(" connecting to(%s)" % pburl)
+ self.reconnectors[pburl] = self.tub.connectTo(pburl, _got_peer)
+
+ def _got_introducer(self, introducer):
+ log.msg(" introducing ourselves: %s, %s" % (self, self.my_pburl))
+ d = introducer.callRemote("hello",
+ node=self,
+ pburl=self.my_pburl)
from twisted.internet.error import ConnectionLost, ConnectionDone
from allmydata.util import idlib
from zope.interface import implements
-from allmydata.interfaces import RIQueenRoster
+from allmydata.interfaces import RIQueenRoster, RIIntroducer
from allmydata import node
from allmydata.filetable import GlobalVirtualDrive
implements(RIQueenRoster)
def __init__(self):
- service.MultiService.__init__(self)
- self.phonebook = {}
- self.connections = {}
self.gvd_root = None
def set_gvd_root(self, root):
self.gvd_root = root
- def remote_hello(self, nodeid, node, pburl):
- log.msg("roster: contact from %s" % idlib.b2a(nodeid))
- self.phonebook[nodeid] = pburl
- self.connections[nodeid] = node
- eventually(self._educate_the_new_peer,
- nodeid, node, list(self.phonebook.items()))
- eventually(self._announce_new_peer,
- nodeid, pburl, list(self.connections.values()))
- node.notifyOnDisconnect(self._lost_node, nodeid)
-
def remote_get_global_vdrive(self):
return self.gvd_root
- def _educate_the_new_peer(self, nodeid, node, new_peers):
- log.msg("roster: educating %s (%d)" % (idlib.b2a(nodeid)[:4], len(new_peers)))
- node.callRemote("add_peers", new_peers=new_peers)
-
- def _announce_new_peer(self, new_nodeid, new_node_pburl, peers):
- log.msg("roster: announcing %s to everybody (%d)" % (idlib.b2a(new_nodeid)[:4], len(peers)))
- for targetnode in peers:
- targetnode.callRemote("add_peers",
- new_peers=[(new_nodeid, new_node_pburl)])
-
- def _lost_node(self, nodeid):
- log.msg("roster: lost contact with %s" % idlib.b2a(nodeid))
- del self.phonebook[nodeid]
- del self.connections[nodeid]
- eventually(self._announce_lost_peer, nodeid)
-
- def _announce_lost_peer(self, lost_nodeid):
- for targetnode in self.connections.values():
- # use sendOnly, because if they go away then we assume it's
- # because they crashed and they've lost all their peer
- # connections anyways.
- sendOnly(targetnode.callRemote, "lost_peers",
- lost_peers=[lost_nodeid])
-
class Queen(node.Node):
def test_permute(self):
c = client.Client("")
- c.all_peers = ["%d" % i for i in range(5)]
+ for k in ["%d" % i for i in range(5)]:
+ c.connections[k] = None
self.failUnlessEqual(c.permute_peerids("one"), ['3','1','0','4','2'])
self.failUnlessEqual(c.permute_peerids("one", 3), ['3','1','0'])
self.failUnlessEqual(c.permute_peerids("two"), ['0','4','2','1','3'])
- c.all_peers = []
+ c.connections.clear()
self.failUnlessEqual(c.permute_peerids("one"), [])
c2 = client.Client("")
- c2.all_peers = ["%d" % i for i in range(5)]
+ for k in ["%d" % i for i in range(5)]:
+ c2.connections[k] = None
self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2'])
def data_num_peers(self, ctx, data):
#client = inevow.ISite(ctx)._client
client = IClient(ctx)
- return len(client.all_peers)
+ return len(client.connections)
def data_num_connected_peers(self, ctx, data):
return len(IClient(ctx).connections)
def data_peers(self, ctx, data):
d = []
client = IClient(ctx)
- for nodeid in sorted(client.all_peers):
- if nodeid in client.connections:
- connected = "yes"
- else:
- connected = "no"
- pburl = client.peer_pburls[nodeid]
- row = (idlib.b2a(nodeid), connected, pburl)
+ for nodeid in sorted(client.connections.keys()):
+ row = (idlib.b2a(nodeid), "yes", "?")
d.append(row)
return d