import os, sha
from foolscap import Referenceable
-from twisted.application import service
-from twisted.python import log
from zope.interface import implements
from allmydata.interfaces import RIClient
from allmydata import node
from twisted.internet import defer
-from allmydata.util import idlib
from allmydata.storageserver import StorageServer
from allmydata.upload import Uploader
from allmydata.download import Downloader
from allmydata.vdrive import VDrive
from allmydata.webish import WebishServer
from allmydata.control import ControlServer
+from allmydata.introducer import IntroducerClient
class Client(node.Node, Referenceable):
implements(RIClient)
STOREDIR = 'storage'
NODETYPE = "client"
WEBPORTFILE = "webport"
- QUEEN_PBURL_FILE = "roster_pburl"
+ INTRODUCER_FURL_FILE = "introducer.furl"
+ GLOBAL_VDRIVE_FURL_FILE = "vdrive.furl"
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
- self.queen = None # self.queen is either None or a RemoteReference
+ self.my_pburl = None
self.introducer_client = None
+ self.connected_to_vdrive = False
self.add_service(StorageServer(os.path.join(basedir, self.STOREDIR)))
self.add_service(Uploader())
self.add_service(Downloader())
webport = f.read() # strports string
f.close()
self.add_service(WebishServer(webport))
- self.queen_pburl = None
- QUEEN_PBURL_FILE = os.path.join(self.basedir, self.QUEEN_PBURL_FILE)
- if os.path.exists(QUEEN_PBURL_FILE):
- f = open(QUEEN_PBURL_FILE, "r")
- self.queen_pburl = f.read().strip()
- f.close()
- self.queen_connector = None
+
+ INTRODUCER_FURL_FILE = os.path.join(self.basedir,
+ self.INTRODUCER_FURL_FILE)
+ f = open(INTRODUCER_FURL_FILE, "r")
+ self.introducer_furl = f.read().strip()
+ f.close()
+
+ GLOBAL_VDRIVE_FURL_FILE = os.path.join(self.basedir,
+ self.GLOBAL_VDRIVE_FURL_FILE)
+ f = open(GLOBAL_VDRIVE_FURL_FILE, "r")
+ self.global_vdrive_furl = f.read().strip()
+ f.close()
def tub_ready(self):
+ self.log("tub_ready")
self.my_pburl = self.tub.registerReference(self)
- if self.queen_pburl:
- self.introducer_client = IntroducerClient(self.tub, self.queen_pburl, self.my_pburl)
+
+ ic = IntroducerClient(self.tub, self.introducer_furl, self.my_pburl)
+ self.introducer_client = ic
+ ic.setServiceParent(self)
+
self.register_control()
- self.maybe_connect_to_queen()
-
- def set_queen_pburl(self, queen_pburl):
- self.queen_pburl = queen_pburl
- self.maybe_connect_to_queen()
-
- def maybe_connect_to_queen(self):
- if not self.running:
- return
- if not self.my_pburl:
- return
- if self.queen_connector:
- return
- if not self.queen_pburl:
- self.log("no queen_pburl, cannot connect")
- return
- self.queen_connector = self.tub.connectTo(self.queen_pburl,
- self._got_queen)
+
+ self.vdrive_connector = self.tub.connectTo(self.global_vdrive_furl,
+ self._got_vdrive)
def register_control(self):
c = ControlServer()
f.close()
os.chmod("control.pburl", 0600)
- def stopService(self):
- if self.introducer_client:
- self.introducer_client.stop()
- return service.MultiService.stopService(self)
-
- def _got_queen(self, queen):
- self.log("connected to queen")
- d.addCallback(lambda x: queen.callRemote("get_global_vdrive"))
- d.addCallback(self._got_vdrive_root)
-
- def _got_vdrive_root(self, root):
- self.getServiceNamed("vdrive").set_root(root)
+ def _got_vdrive(self, vdrive_root):
+ # vdrive_root implements RIMutableDirectoryNode
+ self.log("connected to vdrive")
+ self.connected_to_vdrive = True
+ self.getServiceNamed("vdrive").set_root(vdrive_root)
if "webish" in self.namedServices:
- self.getServiceNamed("webish").set_root_dirnode(root)
+ self.getServiceNamed("webish").set_root_dirnode(vdrive_root)
+ def _disconnected():
+ self.connected_to_vdrive = False
+ vdrive_root.notifyOnDisconnect(_disconnected)
def remote_get_service(self, name):
# TODO: 'vdrive' should not be public in the medium term
return self.getServiceNamed(name)
def get_remote_service(self, nodeid, servicename):
- if nodeid not in self.connections:
+ if nodeid not in self.introducer_client.connections:
return defer.fail(IndexError("no connection to that peer"))
- peer = self.connections[nodeid]
+ peer = self.introducer_client.connections[nodeid]
d = peer.callRemote("get_service", name=servicename)
return d
+ def get_all_peerids(self):
+ return self.introducer_client.connections.iterkeys()
+
def permute_peerids(self, key, max_count=None):
# TODO: eventually reduce memory consumption by doing an insertion
# sort of at most max_count elements
results = []
- for nodeid in self.all_peers:
+ for nodeid in self.get_all_peerids():
assert isinstance(nodeid, str)
permuted = sha.new(key + nodeid).digest()
results.append((permuted, nodeid))
from zope.interface import Interface
-from foolscap.schema import StringConstraint, ListOf, SetOf, TupleOf, Any
+from foolscap.schema import StringConstraint, ListOf, TupleOf, Any
from foolscap import RemoteInterface
Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash
RIBucketReader_ = Any()
RIMutableDirectoryNode_ = Any()
RIMutableFileNode_ = Any()
+def SetOf(*args, **kwargs): return Any()
+def DictOf(*args, **kwargs): return Any()
class RIIntroducerClient(RemoteInterface):
def new_peers(pburls=SetOf(PBURL)):
def hello(node=RIIntroducerClient, pburl=PBURL):
return None
-class RIQueenRoster(RemoteInterface):
- def get_global_vdrive():
- return RIMutableDirectoryNode_ # the virtual drive root
-
-class RIClient(RIIntroducerClient):
+class RIClient(RemoteInterface):
def get_service(name=str):
return Referenceable_
def get_nodeid():
-from foolscap import Referenceable, DeadReferenceError
+
+import re
+from zope.interface import implements
from twisted.application import service
from twisted.python import log
-from twisted.internet.error import ConnectionLost, ConnectionDone
-from zope.interface import implements
-from allmydata.interfaces import RIIntroducer
-
-
-def sendOnly(call, methname, *args, **kwargs):
- d = call(methname, *args, **kwargs)
- def _trap(f):
- f.trap(DeadReferenceError, ConnectionLost, ConnectionDone)
- d.addErrback(_trap)
+from foolscap import Referenceable
+from allmydata.interfaces import RIIntroducer, RIIntroducerClient
+from allmydata.util import idlib, observer
class Introducer(service.MultiService, Referenceable):
implements(RIIntroducer)
self.pburls = set()
def remote_hello(self, node, pburl):
- log.msg("roster: new contact at %s, node is %s" % (pburl, node))
+ log.msg("introducer: new contact at %s, node is %s" % (pburl, node))
def _remove():
- log.msg(" roster: removing %s %s" % (node, pburl))
+ log.msg(" introducer: removing %s %s" % (node, pburl))
self.nodes.remove(node)
self.pburls.remove(pburl)
node.notifyOnDisconnect(_remove)
for othernode in self.nodes:
othernode.callRemote("new_peers", set([pburl]))
self.nodes.add(node)
+
+
+class IntroducerClient(service.Service, Referenceable):
+ implements(RIIntroducerClient)
+
+ def __init__(self, tub, introducer_pburl, my_pburl):
+ self.tub = tub
+ self.introducer_pburl = introducer_pburl
+ self.my_pburl = my_pburl
+
+ self.connections = {} # k: nodeid, v: ref
+ self.reconnectors = {} # k: PBURL, v: reconnector
+
+ self.connection_observers = observer.ObserverList()
+
+ def startService(self):
+ self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl,
+ self._got_introducer)
+
+ def log(self, msg):
+ self.parent.log(msg)
+
+ def remote_new_peers(self, pburls):
+ for pburl in pburls:
+ self._new_peer(pburl)
+
+ def stopService(self):
+ service.Service.stopService(self)
+ self.introducer_reconnector.stopConnecting()
+ for reconnector in self.reconnectors.itervalues():
+ reconnector.stopConnecting()
+
+ def _new_peer(self, pburl):
+ if pburl in self.reconnectors:
+ return
+ m = re.match(r'pb://(\w+)@', pburl)
+ assert m
+ nodeid = idlib.a2b(m.group(1))
+ def _got_peer(rref):
+ self.log(" connected to(%s)" % idlib.b2a(nodeid))
+ self.connection_observers.notify(nodeid, rref)
+ self.connections[nodeid] = rref
+ def _lost():
+ # TODO: notifyOnDisconnect uses eventually(), but connects do
+ # not. Could this cause a problem?
+ del self.connections[nodeid]
+ rref.notifyOnDisconnect(_lost)
+ self.log(" connecting to(%s)" % pburl)
+ self.reconnectors[pburl] = self.tub.connectTo(pburl, _got_peer)
+
+ def _got_introducer(self, introducer):
+ self.log(" introducing ourselves: %s, %s" % (self, self.my_pburl))
+ d = introducer.callRemote("hello",
+ node=self,
+ pburl=self.my_pburl)
+
+ def notify_on_new_connection(self, cb):
+ """Register a callback that will be fired (with nodeid, rref) when
+ a new connection is established."""
+ self.connection_observers.subscribe(cb)
+
+++ /dev/null
-class IntroducerClient(Referenceable):
- implements(RIIntroducerClient)
-
- def __init__(self, tub, introducer_pburl, my_pburl):
- self.introducer_reconnector = self.tub.connectTo(introducer_pburl,
- self._got_introducer)
-
- self.tub = tub
- self.my_pburl = my_pburl
-
- self.connections = {} # k: nodeid, v: ref
- self.reconnectors = {} # k: PBURL, v: reconnector
-
- def remote_get_nodeid(self):
- return self.nodeid
-
- def remote_new_peers(self, pburls):
- for pburl in pburls:
- self._new_peer(pburl)
-
- def stop(self):
- self.introducer_reconnector.stopConnecting()
- for reconnector in self.reconnectors.itervalues():
- reconnector.stopConnecting()
-
- def _new_peer(self, pburl):
- if pburl in self.reconnectors:
- return
- def _got_peer(rref):
- d2 = rref.callRemote("get_nodeid")
- def _got_nodeid(nodeid):
- self.connections[nodeid] = rref
- def _lost():
- # TODO: notifyOnDisconnect uses eventually(), but connects do not. Could this cause a problem?
- del self.connections[nodeid]
- rref.notifyOnDisconnect(_lost)
- d2.addCallback(_got_nodeid)
- log.msg(" connecting to(%s)" % pburl)
- self.reconnectors[pburl] = self.tub.connectTo(pburl, _got_peer)
-
- def _got_introducer(self, introducer):
- log.msg(" introducing ourselves: %s, %s" % (self, self.my_pburl))
- d = introducer.callRemote("hello",
- node=self,
- pburl=self.my_pburl)
f = open(certfile, "wb")
f.write(self.tub.getCertData())
f.close()
+ if False: # TODO: once foolscap-0.1.1 is released, enable this
+ self.tub.setOption("logLocalFailures", True)
+ self.tub.setOption("logRemoteFailures", True)
self.nodeid = idlib.a2b(self.tub.tubID)
f = open(os.path.join(self.basedir, self.NODEIDFILE), "w")
f.write(idlib.b2a(self.nodeid) + "\n")
import os.path
-from foolscap import Referenceable, DeadReferenceError
-from foolscap.eventual import eventually
-from twisted.application import service
-from twisted.python import log
-from twisted.internet.error import ConnectionLost, ConnectionDone
-from allmydata.util import idlib
-from zope.interface import implements
-from allmydata.interfaces import RIQueenRoster, RIIntroducer
from allmydata import node
from allmydata.filetable import GlobalVirtualDrive
-
-
-def sendOnly(call, methname, *args, **kwargs):
- d = call(methname, *args, **kwargs)
- def _trap(f):
- f.trap(DeadReferenceError, ConnectionLost, ConnectionDone)
- d.addErrback(_trap)
-
-class Roster(service.MultiService, Referenceable):
- implements(RIQueenRoster)
-
- def __init__(self):
- self.gvd_root = None
-
- def set_gvd_root(self, root):
- self.gvd_root = root
-
- def remote_get_global_vdrive(self):
- return self.gvd_root
-
+from allmydata.introducer import Introducer
class Queen(node.Node):
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
- self.gvd = self.add_service(GlobalVirtualDrive(basedir))
self.urls = {}
def tub_ready(self):
- r = self.add_service(Roster())
- self.urls["roster"] = self.tub.registerReference(r, "roster")
- self.log(" roster is at %s" % self.urls["roster"])
- f = open(os.path.join(self.basedir, "roster_pburl"), "w")
- f.write(self.urls["roster"] + "\n")
+ r = self.add_service(Introducer())
+ self.urls["introducer"] = self.tub.registerReference(r, "introducer")
+ self.log(" introducer is at %s" % self.urls["introducer"])
+ f = open(os.path.join(self.basedir, "introducer.furl"), "w")
+ f.write(self.urls["introducer"] + "\n")
+ f.close()
+
+ gvd = self.add_service(GlobalVirtualDrive(self.basedir))
+ self.urls["vdrive"] = self.tub.registerReference(gvd.get_root(),
+ "vdrive")
+ self.log(" vdrive is at %s" % self.urls["vdrive"])
+ f = open(os.path.join(self.basedir, "vdrive.furl"), "w")
+ f.write(self.urls["vdrive"] + "\n")
f.close()
- r.set_gvd_root(self.gvd.get_root())
f = open(os.path.join(basedir, "client.tac"), "w")
f.write(client_tac)
f.close()
- print "client created in %s, please copy roster_pburl into the directory" % basedir
+ print "client created in %s" % basedir
+ print " please copy introducer.furl and vdrive.furl into the directory"
def create_queen(config):
basedir = config['basedir']
def make_nodes(self):
q = self.queen
- self.queen_pburl = q.urls["roster"]
+ self.queen_pburl = q.urls["introducer"]
+ vdrive_furl = q.urls["vdrive"]
self.nodes = []
for i in range(self.numnodes):
nodedir = os.path.join(self.basedir, "node%d" % i)
os.mkdir(nodedir)
+ f = open(os.path.join(nodedir, "introducer.furl"), "w")
+ f.write(self.queen_pburl)
+ f.close()
+ f = open(os.path.join(nodedir, "vdrive.furl"), "w")
+ f.write(vdrive_furl)
+ f.close()
c = self.add_service(client.Client(basedir=nodedir))
- c.set_queen_pburl(self.queen_pburl)
self.nodes.append(c)
# the peers will start running, eventually they will connect to each
# other and the queen
f.write("If the node notices this file at startup, it will poll and\n")
f.write("terminate as soon as the file goes away. This prevents\n")
f.write("leaving processes around if the test harness has an\n")
- f.write("internal failure and neglects to kil off the node\n")
+ f.write("internal failure and neglects to kill off the node\n")
f.write("itself. The contents of this file are ignored.\n")
f.close()
config = {'basedir': clientdir}
runner.create_client(config)
log.msg("DONE MAKING CLIENT")
- f = open(os.path.join(clientdir, "roster_pburl"), "w")
+ f = open(os.path.join(clientdir, "introducer.furl"), "w")
f.write(self.queen_pburl + "\n")
f.close()
self.keepalive_file = os.path.join(clientdir, "suicide_prevention_hotline")
+import os
from twisted.trial import unittest
from allmydata import client
+class MyClient(client.Client):
+ def __init__(self, basedir):
+ self.connections = {}
+ client.Client.__init__(self, basedir)
+
+ def get_all_peerids(self):
+ return self.connections
+
class Basic(unittest.TestCase):
def test_loadable(self):
- c = client.Client("")
- d = c.startService()
- d.addCallback(lambda res: c.stopService())
- return d
+ basedir = "test_client.Basic.test_loadable"
+ os.mkdir(basedir)
+ open(os.path.join(basedir, "introducer.furl"), "w").write("")
+ open(os.path.join(basedir, "vdrive.furl"), "w").write("")
+ c = client.Client(basedir)
def test_permute(self):
- c = client.Client("")
+ basedir = "test_client.Basic.test_permute"
+ os.mkdir(basedir)
+ open(os.path.join(basedir, "introducer.furl"), "w").write("")
+ open(os.path.join(basedir, "vdrive.furl"), "w").write("")
+ c = MyClient(basedir)
for k in ["%d" % i for i in range(5)]:
c.connections[k] = None
self.failUnlessEqual(c.permute_peerids("one"), ['3','1','0','4','2'])
c.connections.clear()
self.failUnlessEqual(c.permute_peerids("one"), [])
- c2 = client.Client("")
+ c2 = MyClient(basedir)
for k in ["%d" % i for i in range(5)]:
c2.connections[k] = None
self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2'])
--- /dev/null
+
+from twisted.trial import unittest
+from twisted.internet import defer, reactor
+from twisted.python import log
+defer.setDebugging(True)
+
+from foolscap import Tub, Referenceable
+from twisted.application import service
+from allmydata.introducer import IntroducerClient, Introducer
+from allmydata.util import idlib
+
+class MyNode(Referenceable):
+ pass
+
+class LoggingMultiService(service.MultiService):
+ def log(self, msg):
+ pass
+
+class TestIntroducer(unittest.TestCase):
+ def setUp(self):
+ self.parent = LoggingMultiService()
+ self.parent.startService()
+ def tearDown(self):
+ log.msg("TestIntroducer.tearDown")
+ d = defer.Deferred()
+ reactor.callLater(1.1, d.callback, None)
+ d.addCallback(lambda res: self.parent.stopService())
+ return d
+
+
+
+ def poll(self, check_f, pollinterval=0.01):
+ # Return a Deferred, then call check_f periodically until it returns
+ # True, at which point the Deferred will fire.. If check_f raises an
+ # exception, the Deferred will errback.
+ d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
+ return d
+
+ def _poll(self, res, check_f, pollinterval):
+ if check_f():
+ return True
+ d = defer.Deferred()
+ d.addCallback(self._poll, check_f, pollinterval)
+ reactor.callLater(pollinterval, d.callback, None)
+ return d
+
+
+ def test_create(self):
+ ic = IntroducerClient(None, "introducer", "mypburl")
+ def _ignore(nodeid, rref):
+ pass
+ ic.notify_on_new_connection(_ignore)
+
+ def test_listen(self):
+ i = Introducer()
+ i.setServiceParent(self.parent)
+
+ def test_system(self):
+
+ self.central_tub = tub = Tub()
+ #tub.setOption("logLocalFailures", True)
+ #tub.setOption("logRemoteFailures", True)
+ tub.setServiceParent(self.parent)
+ l = tub.listenOn("tcp:0")
+ portnum = l.getPortnum()
+ tub.setLocation("localhost:%d" % portnum)
+
+ i = Introducer()
+ i.setServiceParent(self.parent)
+ iurl = tub.registerReference(i)
+ NUMCLIENTS = 5
+
+ self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
+ d = self._done_counting = defer.Deferred()
+ def _count(nodeid, rref):
+ log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
+ self.waiting_for_connections -= 1
+ if self.waiting_for_connections == 0:
+ self._done_counting.callback("done!")
+
+ clients = []
+ tubs = {}
+ for i in range(NUMCLIENTS):
+ tub = Tub()
+ #tub.setOption("logLocalFailures", True)
+ #tub.setOption("logRemoteFailures", True)
+ tub.setServiceParent(self.parent)
+ l = tub.listenOn("tcp:0")
+ portnum = l.getPortnum()
+ tub.setLocation("localhost:%d" % portnum)
+
+ n = MyNode()
+ node_pburl = tub.registerReference(n)
+ c = IntroducerClient(tub, iurl, node_pburl)
+ c.notify_on_new_connection(_count)
+ c.setServiceParent(self.parent)
+ clients.append(c)
+ tubs[c] = tub
+
+ # d will fire once everybody is connected
+
+ def _check(res):
+ log.msg("doing _check")
+ for c in clients:
+ self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+ # now disconnect somebody's connection to someone else
+ self.waiting_for_connections = 2
+ d2 = self._done_counting = defer.Deferred()
+ origin_c = clients[0]
+ # find a target that is not themselves
+ for nodeid,rref in origin_c.connections.items():
+ if idlib.b2a(nodeid) != tubs[origin_c].tubID:
+ victim = rref
+ break
+ log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
+ victim.tracker.broker.transport.loseConnection()
+ log.msg(" did disconnect")
+ return d2
+ d.addCallback(_check)
+ def _check_again(res):
+ log.msg("doing _check_again")
+ for c in clients:
+ self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+ # now disconnect somebody's connection to themselves. This will
+ # only result in one new connection, since it is a loopback.
+ self.waiting_for_connections = 1
+ d2 = self._done_counting = defer.Deferred()
+ origin_c = clients[0]
+ # find a target that *is* themselves
+ for nodeid,rref in origin_c.connections.items():
+ if idlib.b2a(nodeid) == tubs[origin_c].tubID:
+ victim = rref
+ break
+ log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
+ victim.tracker.broker.transport.loseConnection()
+ log.msg(" did disconnect")
+ return d2
+ d.addCallback(_check_again)
+ def _check_again2(res):
+ log.msg("doing _check_again2")
+ for c in clients:
+ self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+ # now disconnect somebody's connection to themselves
+ d.addCallback(_check_again2)
+ return d
+
+ def stall(self, res, timeout):
+ d = defer.Deferred()
+ reactor.callLater(timeout, d.callback, res)
+ return d
+
+ def test_system_this_one_breaks(self):
+ # this uses a single Tub, which has a strong effect on the
+ # failingness
+ tub = Tub()
+ tub.setOption("logLocalFailures", True)
+ tub.setOption("logRemoteFailures", True)
+ tub.setServiceParent(self.parent)
+ l = tub.listenOn("tcp:0")
+ portnum = l.getPortnum()
+ tub.setLocation("localhost:%d" % portnum)
+
+ i = Introducer()
+ i.setServiceParent(self.parent)
+ iurl = tub.registerReference(i)
+
+ clients = []
+ for i in range(5):
+ n = MyNode()
+ node_pburl = tub.registerReference(n)
+ c = IntroducerClient(tub, iurl, node_pburl)
+ c.setServiceParent(self.parent)
+ clients.append(c)
+
+ # time passes..
+ d = defer.Deferred()
+ def _check(res):
+ log.msg("doing _check")
+ self.failUnlessEqual(len(clients[0].connections), 5)
+ d.addCallback(_check)
+ reactor.callLater(2, d.callback, None)
+ return d
+ del test_system_this_one_breaks
+
+
+ def test_system_this_one_breaks_too(self):
+ # this one shuts down so quickly that it fails in a different way
+ self.central_tub = tub = Tub()
+ tub.setOption("logLocalFailures", True)
+ tub.setOption("logRemoteFailures", True)
+ tub.setServiceParent(self.parent)
+ l = tub.listenOn("tcp:0")
+ portnum = l.getPortnum()
+ tub.setLocation("localhost:%d" % portnum)
+
+ i = Introducer()
+ i.setServiceParent(self.parent)
+ iurl = tub.registerReference(i)
+
+ clients = []
+ for i in range(5):
+ tub = Tub()
+ tub.setOption("logLocalFailures", True)
+ tub.setOption("logRemoteFailures", True)
+ tub.setServiceParent(self.parent)
+ l = tub.listenOn("tcp:0")
+ portnum = l.getPortnum()
+ tub.setLocation("localhost:%d" % portnum)
+
+ n = MyNode()
+ node_pburl = tub.registerReference(n)
+ c = IntroducerClient(tub, iurl, node_pburl)
+ c.setServiceParent(self.parent)
+ clients.append(c)
+
+ # time passes..
+ d = defer.Deferred()
+ reactor.callLater(0.01, d.callback, None)
+ def _check(res):
+ log.msg("doing _check")
+ self.fail("BOOM")
+ for c in clients:
+ self.failUnlessEqual(len(c.connections), 5)
+ c.connections.values()[0].tracker.broker.transport.loseConnection()
+ return self.stall(None, 2)
+ d.addCallback(_check)
+ def _check_again(res):
+ log.msg("doing _check_again")
+ for c in clients:
+ self.failUnlessEqual(len(c.connections), 5)
+ d.addCallback(_check_again)
+ return d
+ del test_system_this_one_breaks_too
+
def _set_up_nodes_2(self, res):
q = self.queen
- self.queen_pburl = q.urls["roster"]
+ self.queen_furl = q.urls["introducer"]
+ self.vdrive_furl = q.urls["vdrive"]
self.clients = []
for i in range(self.numclients):
basedir = "client%d" % i
if not os.path.isdir(basedir):
os.mkdir(basedir)
if i == 0:
- f = open(os.path.join(basedir, "webport"), "w")
- f.write("tcp:0:interface=127.0.0.1")
- f.close()
+ open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
+ open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl)
+ open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl)
c = self.add_service(client.Client(basedir=basedir))
- c.set_queen_pburl(self.queen_pburl)
self.clients.append(c)
log.msg("STARTING")
d = self.wait_for_connections()
basedir = "client%d" % client_num
if not os.path.isdir(basedir):
os.mkdir(basedir)
+ open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl)
+ open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl)
+
c = client.Client(basedir=basedir)
self.clients.append(c)
- c.set_queen_pburl(self.queen_pburl)
self.numclients += 1
c.startService()
d = self.wait_for_connections()
def wait_for_connections(self, ignored=None):
for c in self.clients:
- if len(c.connections) != self.numclients:
+ if not c.introducer_client or len(c.get_all_peerids()) != self.numclients:
d = defer.Deferred()
d.addCallback(self.wait_for_connections)
reactor.callLater(0.05, d.callback, None)
def test_connections(self):
d = self.set_up_nodes()
+ self.extra_node = None
d.addCallback(lambda res: self.add_extra_node(5))
def _check(extra_node):
self.extra_node = extra_node
for c in self.clients:
- self.failUnlessEqual(len(c.connections), 6)
+ self.failUnlessEqual(len(c.get_all_peerids()), 6)
d.addCallback(_check)
def _shutdown_extra_node(res):
- d1 = self.extra_node.stopService()
- d2 = defer.Deferred()
- reactor.callLater(self.DISCONNECT_DELAY, d2.callback, res)
- d1.addCallback(lambda ignored: d2)
- return d1
+ if self.extra_node:
+ d1 = self.extra_node.stopService()
+ d2 = defer.Deferred()
+ reactor.callLater(self.DISCONNECT_DELAY, d2.callback, res)
+ d1.addCallback(lambda ignored: d2)
+ return d1
+ return res
d.addBoth(_shutdown_extra_node)
return d
<table n:render="sequence" n:data="peers" border="1">
<tr n:pattern="header">
<td>PeerID</td>
- <td>Connected?</td>
- <td>PBURL</td>
</tr>
<tr n:pattern="item" n:render="row">
<td><tt><n:slot name="peerid"/></tt></td>
- <td><n:slot name="connected"/></td>
- <td><n:slot name="pburl"/></td>
</tr>
<tr n:pattern="empty"><td>no peers!</td></tr>
</table>
docFactory = getxmlfile("welcome.xhtml")
def data_queen_pburl(self, ctx, data):
- return IClient(ctx).queen_pburl
+ return IClient(ctx).introducer_furl
def data_connected_to_queen(self, ctx, data):
- if IClient(ctx).queen:
+ if IClient(ctx).connected_to_vdrive:
return "yes"
return "no"
def data_num_peers(self, ctx, data):
#client = inevow.ISite(ctx)._client
client = IClient(ctx)
- return len(client.connections)
+ return len(client.get_all_peerids())
def data_num_connected_peers(self, ctx, data):
- return len(IClient(ctx).connections)
+ return len(IClient(ctx).get_all_peerids())
def data_peers(self, ctx, data):
d = []
client = IClient(ctx)
- for nodeid in sorted(client.connections.keys()):
- row = (idlib.b2a(nodeid), "yes", "?")
+ for nodeid in sorted(client.get_all_peerids()):
+ row = (idlib.b2a(nodeid),)
d.append(row)
return d
def render_row(self, ctx, data):
- nodeid_a, connected, pburl = data
+ (nodeid_a,) = data
ctx.fillSlots("peerid", nodeid_a)
- ctx.fillSlots("connected", connected)
- ctx.fillSlots("pburl", pburl)
return ctx.tag
# this is a form where users can download files by URI