From: Brian Warner Date: Tue, 27 Mar 2007 23:12:11 +0000 (-0700) Subject: complete the Introducer changes, separate out vdrive access, make everything work... X-Git-Url: https://git.rkrishnan.org/simplejson/components/%22file:/?a=commitdiff_plain;h=25ff9e1f97565977122204f2039fb63eaff48f05;p=tahoe-lafs%2Ftahoe-lafs.git complete the Introducer changes, separate out vdrive access, make everything work again --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index b063805e..72d96b24 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -1,21 +1,19 @@ import os, sha from foolscap import Referenceable -from twisted.application import service -from twisted.python import log from zope.interface import implements from allmydata.interfaces import RIClient from allmydata import node from twisted.internet import defer -from allmydata.util import idlib from allmydata.storageserver import StorageServer from allmydata.upload import Uploader from allmydata.download import Downloader from allmydata.vdrive import VDrive from allmydata.webish import WebishServer from allmydata.control import ControlServer +from allmydata.introducer import IntroducerClient class Client(node.Node, Referenceable): implements(RIClient) @@ -24,12 +22,14 @@ class Client(node.Node, Referenceable): STOREDIR = 'storage' NODETYPE = "client" WEBPORTFILE = "webport" - QUEEN_PBURL_FILE = "roster_pburl" + INTRODUCER_FURL_FILE = "introducer.furl" + GLOBAL_VDRIVE_FURL_FILE = "vdrive.furl" def __init__(self, basedir="."): node.Node.__init__(self, basedir) - self.queen = None # self.queen is either None or a RemoteReference + self.my_pburl = None self.introducer_client = None + self.connected_to_vdrive = False self.add_service(StorageServer(os.path.join(basedir, self.STOREDIR))) self.add_service(Uploader()) self.add_service(Downloader()) @@ -40,37 +40,31 @@ class Client(node.Node, Referenceable): webport = f.read() # strports string f.close() self.add_service(WebishServer(webport)) - self.queen_pburl = None - QUEEN_PBURL_FILE = os.path.join(self.basedir, self.QUEEN_PBURL_FILE) - if os.path.exists(QUEEN_PBURL_FILE): - f = open(QUEEN_PBURL_FILE, "r") - self.queen_pburl = f.read().strip() - f.close() - self.queen_connector = None + + INTRODUCER_FURL_FILE = os.path.join(self.basedir, + self.INTRODUCER_FURL_FILE) + f = open(INTRODUCER_FURL_FILE, "r") + self.introducer_furl = f.read().strip() + f.close() + + GLOBAL_VDRIVE_FURL_FILE = os.path.join(self.basedir, + self.GLOBAL_VDRIVE_FURL_FILE) + f = open(GLOBAL_VDRIVE_FURL_FILE, "r") + self.global_vdrive_furl = f.read().strip() + f.close() def tub_ready(self): + self.log("tub_ready") self.my_pburl = self.tub.registerReference(self) - if self.queen_pburl: - self.introducer_client = IntroducerClient(self.tub, self.queen_pburl, self.my_pburl) + + ic = IntroducerClient(self.tub, self.introducer_furl, self.my_pburl) + self.introducer_client = ic + ic.setServiceParent(self) + self.register_control() - self.maybe_connect_to_queen() - - def set_queen_pburl(self, queen_pburl): - self.queen_pburl = queen_pburl - self.maybe_connect_to_queen() - - def maybe_connect_to_queen(self): - if not self.running: - return - if not self.my_pburl: - return - if self.queen_connector: - return - if not self.queen_pburl: - self.log("no queen_pburl, cannot connect") - return - self.queen_connector = self.tub.connectTo(self.queen_pburl, - self._got_queen) + + self.vdrive_connector = self.tub.connectTo(self.global_vdrive_furl, + self._got_vdrive) def register_control(self): c = ControlServer() @@ -81,38 +75,37 @@ class Client(node.Node, Referenceable): f.close() os.chmod("control.pburl", 0600) - def stopService(self): - if self.introducer_client: - self.introducer_client.stop() - return service.MultiService.stopService(self) - - def _got_queen(self, queen): - self.log("connected to queen") - d.addCallback(lambda x: queen.callRemote("get_global_vdrive")) - d.addCallback(self._got_vdrive_root) - - def _got_vdrive_root(self, root): - self.getServiceNamed("vdrive").set_root(root) + def _got_vdrive(self, vdrive_root): + # vdrive_root implements RIMutableDirectoryNode + self.log("connected to vdrive") + self.connected_to_vdrive = True + self.getServiceNamed("vdrive").set_root(vdrive_root) if "webish" in self.namedServices: - self.getServiceNamed("webish").set_root_dirnode(root) + self.getServiceNamed("webish").set_root_dirnode(vdrive_root) + def _disconnected(): + self.connected_to_vdrive = False + vdrive_root.notifyOnDisconnect(_disconnected) def remote_get_service(self, name): # TODO: 'vdrive' should not be public in the medium term return self.getServiceNamed(name) def get_remote_service(self, nodeid, servicename): - if nodeid not in self.connections: + if nodeid not in self.introducer_client.connections: return defer.fail(IndexError("no connection to that peer")) - peer = self.connections[nodeid] + peer = self.introducer_client.connections[nodeid] d = peer.callRemote("get_service", name=servicename) return d + def get_all_peerids(self): + return self.introducer_client.connections.iterkeys() + def permute_peerids(self, key, max_count=None): # TODO: eventually reduce memory consumption by doing an insertion # sort of at most max_count elements results = [] - for nodeid in self.all_peers: + for nodeid in self.get_all_peerids(): assert isinstance(nodeid, str) permuted = sha.new(key + nodeid).digest() results.append((permuted, nodeid)) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 67bf189e..d6bd9942 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1,6 +1,6 @@ from zope.interface import Interface -from foolscap.schema import StringConstraint, ListOf, SetOf, TupleOf, Any +from foolscap.schema import StringConstraint, ListOf, TupleOf, Any from foolscap import RemoteInterface Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash @@ -16,6 +16,8 @@ RIBucketWriter_ = Any() RIBucketReader_ = Any() RIMutableDirectoryNode_ = Any() RIMutableFileNode_ = Any() +def SetOf(*args, **kwargs): return Any() +def DictOf(*args, **kwargs): return Any() class RIIntroducerClient(RemoteInterface): def new_peers(pburls=SetOf(PBURL)): @@ -25,11 +27,7 @@ class RIIntroducer(RemoteInterface): def hello(node=RIIntroducerClient, pburl=PBURL): return None -class RIQueenRoster(RemoteInterface): - def get_global_vdrive(): - return RIMutableDirectoryNode_ # the virtual drive root - -class RIClient(RIIntroducerClient): +class RIClient(RemoteInterface): def get_service(name=str): return Referenceable_ def get_nodeid(): diff --git a/src/allmydata/introducer.py b/src/allmydata/introducer.py index a0afef19..08520d85 100644 --- a/src/allmydata/introducer.py +++ b/src/allmydata/introducer.py @@ -1,16 +1,11 @@ -from foolscap import Referenceable, DeadReferenceError + +import re +from zope.interface import implements from twisted.application import service from twisted.python import log -from twisted.internet.error import ConnectionLost, ConnectionDone -from zope.interface import implements -from allmydata.interfaces import RIIntroducer - - -def sendOnly(call, methname, *args, **kwargs): - d = call(methname, *args, **kwargs) - def _trap(f): - f.trap(DeadReferenceError, ConnectionLost, ConnectionDone) - d.addErrback(_trap) +from foolscap import Referenceable +from allmydata.interfaces import RIIntroducer, RIIntroducerClient +from allmydata.util import idlib, observer class Introducer(service.MultiService, Referenceable): implements(RIIntroducer) @@ -21,9 +16,9 @@ class Introducer(service.MultiService, Referenceable): self.pburls = set() def remote_hello(self, node, pburl): - log.msg("roster: new contact at %s, node is %s" % (pburl, node)) + log.msg("introducer: new contact at %s, node is %s" % (pburl, node)) def _remove(): - log.msg(" roster: removing %s %s" % (node, pburl)) + log.msg(" introducer: removing %s %s" % (node, pburl)) self.nodes.remove(node) self.pburls.remove(pburl) node.notifyOnDisconnect(_remove) @@ -32,3 +27,64 @@ class Introducer(service.MultiService, Referenceable): for othernode in self.nodes: othernode.callRemote("new_peers", set([pburl])) self.nodes.add(node) + + +class IntroducerClient(service.Service, Referenceable): + implements(RIIntroducerClient) + + def __init__(self, tub, introducer_pburl, my_pburl): + self.tub = tub + self.introducer_pburl = introducer_pburl + self.my_pburl = my_pburl + + self.connections = {} # k: nodeid, v: ref + self.reconnectors = {} # k: PBURL, v: reconnector + + self.connection_observers = observer.ObserverList() + + def startService(self): + self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl, + self._got_introducer) + + def log(self, msg): + self.parent.log(msg) + + def remote_new_peers(self, pburls): + for pburl in pburls: + self._new_peer(pburl) + + def stopService(self): + service.Service.stopService(self) + self.introducer_reconnector.stopConnecting() + for reconnector in self.reconnectors.itervalues(): + reconnector.stopConnecting() + + def _new_peer(self, pburl): + if pburl in self.reconnectors: + return + m = re.match(r'pb://(\w+)@', pburl) + assert m + nodeid = idlib.a2b(m.group(1)) + def _got_peer(rref): + self.log(" connected to(%s)" % idlib.b2a(nodeid)) + self.connection_observers.notify(nodeid, rref) + self.connections[nodeid] = rref + def _lost(): + # TODO: notifyOnDisconnect uses eventually(), but connects do + # not. Could this cause a problem? + del self.connections[nodeid] + rref.notifyOnDisconnect(_lost) + self.log(" connecting to(%s)" % pburl) + self.reconnectors[pburl] = self.tub.connectTo(pburl, _got_peer) + + def _got_introducer(self, introducer): + self.log(" introducing ourselves: %s, %s" % (self, self.my_pburl)) + d = introducer.callRemote("hello", + node=self, + pburl=self.my_pburl) + + def notify_on_new_connection(self, cb): + """Register a callback that will be fired (with nodeid, rref) when + a new connection is established.""" + self.connection_observers.subscribe(cb) + diff --git a/src/allmydata/introducerclient.py b/src/allmydata/introducerclient.py deleted file mode 100644 index 4c193540..00000000 --- a/src/allmydata/introducerclient.py +++ /dev/null @@ -1,45 +0,0 @@ -class IntroducerClient(Referenceable): - implements(RIIntroducerClient) - - def __init__(self, tub, introducer_pburl, my_pburl): - self.introducer_reconnector = self.tub.connectTo(introducer_pburl, - self._got_introducer) - - self.tub = tub - self.my_pburl = my_pburl - - self.connections = {} # k: nodeid, v: ref - self.reconnectors = {} # k: PBURL, v: reconnector - - def remote_get_nodeid(self): - return self.nodeid - - def remote_new_peers(self, pburls): - for pburl in pburls: - self._new_peer(pburl) - - def stop(self): - self.introducer_reconnector.stopConnecting() - for reconnector in self.reconnectors.itervalues(): - reconnector.stopConnecting() - - def _new_peer(self, pburl): - if pburl in self.reconnectors: - return - def _got_peer(rref): - d2 = rref.callRemote("get_nodeid") - def _got_nodeid(nodeid): - self.connections[nodeid] = rref - def _lost(): - # TODO: notifyOnDisconnect uses eventually(), but connects do not. Could this cause a problem? - del self.connections[nodeid] - rref.notifyOnDisconnect(_lost) - d2.addCallback(_got_nodeid) - log.msg(" connecting to(%s)" % pburl) - self.reconnectors[pburl] = self.tub.connectTo(pburl, _got_peer) - - def _got_introducer(self, introducer): - log.msg(" introducing ourselves: %s, %s" % (self, self.my_pburl)) - d = introducer.callRemote("hello", - node=self, - pburl=self.my_pburl) diff --git a/src/allmydata/node.py b/src/allmydata/node.py index 04cabf9e..ab31f9cc 100644 --- a/src/allmydata/node.py +++ b/src/allmydata/node.py @@ -30,6 +30,9 @@ class Node(service.MultiService): f = open(certfile, "wb") f.write(self.tub.getCertData()) f.close() + if False: # TODO: once foolscap-0.1.1 is released, enable this + self.tub.setOption("logLocalFailures", True) + self.tub.setOption("logRemoteFailures", True) self.nodeid = idlib.a2b(self.tub.tubID) f = open(os.path.join(self.basedir, self.NODEIDFILE), "w") f.write(idlib.b2a(self.nodeid) + "\n") diff --git a/src/allmydata/queen.py b/src/allmydata/queen.py index 18192c44..dd10b352 100644 --- a/src/allmydata/queen.py +++ b/src/allmydata/queen.py @@ -1,35 +1,8 @@ import os.path -from foolscap import Referenceable, DeadReferenceError -from foolscap.eventual import eventually -from twisted.application import service -from twisted.python import log -from twisted.internet.error import ConnectionLost, ConnectionDone -from allmydata.util import idlib -from zope.interface import implements -from allmydata.interfaces import RIQueenRoster, RIIntroducer from allmydata import node from allmydata.filetable import GlobalVirtualDrive - - -def sendOnly(call, methname, *args, **kwargs): - d = call(methname, *args, **kwargs) - def _trap(f): - f.trap(DeadReferenceError, ConnectionLost, ConnectionDone) - d.addErrback(_trap) - -class Roster(service.MultiService, Referenceable): - implements(RIQueenRoster) - - def __init__(self): - self.gvd_root = None - - def set_gvd_root(self, root): - self.gvd_root = root - - def remote_get_global_vdrive(self): - return self.gvd_root - +from allmydata.introducer import Introducer class Queen(node.Node): @@ -39,15 +12,21 @@ class Queen(node.Node): def __init__(self, basedir="."): node.Node.__init__(self, basedir) - self.gvd = self.add_service(GlobalVirtualDrive(basedir)) self.urls = {} def tub_ready(self): - r = self.add_service(Roster()) - self.urls["roster"] = self.tub.registerReference(r, "roster") - self.log(" roster is at %s" % self.urls["roster"]) - f = open(os.path.join(self.basedir, "roster_pburl"), "w") - f.write(self.urls["roster"] + "\n") + r = self.add_service(Introducer()) + self.urls["introducer"] = self.tub.registerReference(r, "introducer") + self.log(" introducer is at %s" % self.urls["introducer"]) + f = open(os.path.join(self.basedir, "introducer.furl"), "w") + f.write(self.urls["introducer"] + "\n") + f.close() + + gvd = self.add_service(GlobalVirtualDrive(self.basedir)) + self.urls["vdrive"] = self.tub.registerReference(gvd.get_root(), + "vdrive") + self.log(" vdrive is at %s" % self.urls["vdrive"]) + f = open(os.path.join(self.basedir, "vdrive.furl"), "w") + f.write(self.urls["vdrive"] + "\n") f.close() - r.set_gvd_root(self.gvd.get_root()) diff --git a/src/allmydata/scripts/runner.py b/src/allmydata/scripts/runner.py index ecff3201..c4fe8edb 100644 --- a/src/allmydata/scripts/runner.py +++ b/src/allmydata/scripts/runner.py @@ -122,7 +122,8 @@ def create_client(config): f = open(os.path.join(basedir, "client.tac"), "w") f.write(client_tac) f.close() - print "client created in %s, please copy roster_pburl into the directory" % basedir + print "client created in %s" % basedir + print " please copy introducer.furl and vdrive.furl into the directory" def create_queen(config): basedir = config['basedir'] diff --git a/src/allmydata/test/check_memory.py b/src/allmydata/test/check_memory.py index 0e99ec4e..2a3d2452 100644 --- a/src/allmydata/test/check_memory.py +++ b/src/allmydata/test/check_memory.py @@ -65,13 +65,19 @@ class SystemFramework: def make_nodes(self): q = self.queen - self.queen_pburl = q.urls["roster"] + self.queen_pburl = q.urls["introducer"] + vdrive_furl = q.urls["vdrive"] self.nodes = [] for i in range(self.numnodes): nodedir = os.path.join(self.basedir, "node%d" % i) os.mkdir(nodedir) + f = open(os.path.join(nodedir, "introducer.furl"), "w") + f.write(self.queen_pburl) + f.close() + f = open(os.path.join(nodedir, "vdrive.furl"), "w") + f.write(vdrive_furl) + f.close() c = self.add_service(client.Client(basedir=nodedir)) - c.set_queen_pburl(self.queen_pburl) self.nodes.append(c) # the peers will start running, eventually they will connect to each # other and the queen @@ -81,7 +87,7 @@ class SystemFramework: f.write("If the node notices this file at startup, it will poll and\n") f.write("terminate as soon as the file goes away. This prevents\n") f.write("leaving processes around if the test harness has an\n") - f.write("internal failure and neglects to kil off the node\n") + f.write("internal failure and neglects to kill off the node\n") f.write("itself. The contents of this file are ignored.\n") f.close() @@ -91,7 +97,7 @@ class SystemFramework: config = {'basedir': clientdir} runner.create_client(config) log.msg("DONE MAKING CLIENT") - f = open(os.path.join(clientdir, "roster_pburl"), "w") + f = open(os.path.join(clientdir, "introducer.furl"), "w") f.write(self.queen_pburl + "\n") f.close() self.keepalive_file = os.path.join(clientdir, "suicide_prevention_hotline") diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index b3dbd7f7..a0ce5f2a 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -1,17 +1,31 @@ +import os from twisted.trial import unittest from allmydata import client +class MyClient(client.Client): + def __init__(self, basedir): + self.connections = {} + client.Client.__init__(self, basedir) + + def get_all_peerids(self): + return self.connections + class Basic(unittest.TestCase): def test_loadable(self): - c = client.Client("") - d = c.startService() - d.addCallback(lambda res: c.stopService()) - return d + basedir = "test_client.Basic.test_loadable" + os.mkdir(basedir) + open(os.path.join(basedir, "introducer.furl"), "w").write("") + open(os.path.join(basedir, "vdrive.furl"), "w").write("") + c = client.Client(basedir) def test_permute(self): - c = client.Client("") + basedir = "test_client.Basic.test_permute" + os.mkdir(basedir) + open(os.path.join(basedir, "introducer.furl"), "w").write("") + open(os.path.join(basedir, "vdrive.furl"), "w").write("") + c = MyClient(basedir) for k in ["%d" % i for i in range(5)]: c.connections[k] = None self.failUnlessEqual(c.permute_peerids("one"), ['3','1','0','4','2']) @@ -20,7 +34,7 @@ class Basic(unittest.TestCase): c.connections.clear() self.failUnlessEqual(c.permute_peerids("one"), []) - c2 = client.Client("") + c2 = MyClient(basedir) for k in ["%d" % i for i in range(5)]: c2.connections[k] = None self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2']) diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py new file mode 100644 index 00000000..7d7ce0e6 --- /dev/null +++ b/src/allmydata/test/test_introducer.py @@ -0,0 +1,234 @@ + +from twisted.trial import unittest +from twisted.internet import defer, reactor +from twisted.python import log +defer.setDebugging(True) + +from foolscap import Tub, Referenceable +from twisted.application import service +from allmydata.introducer import IntroducerClient, Introducer +from allmydata.util import idlib + +class MyNode(Referenceable): + pass + +class LoggingMultiService(service.MultiService): + def log(self, msg): + pass + +class TestIntroducer(unittest.TestCase): + def setUp(self): + self.parent = LoggingMultiService() + self.parent.startService() + def tearDown(self): + log.msg("TestIntroducer.tearDown") + d = defer.Deferred() + reactor.callLater(1.1, d.callback, None) + d.addCallback(lambda res: self.parent.stopService()) + return d + + + + def poll(self, check_f, pollinterval=0.01): + # Return a Deferred, then call check_f periodically until it returns + # True, at which point the Deferred will fire.. If check_f raises an + # exception, the Deferred will errback. + d = defer.maybeDeferred(self._poll, None, check_f, pollinterval) + return d + + def _poll(self, res, check_f, pollinterval): + if check_f(): + return True + d = defer.Deferred() + d.addCallback(self._poll, check_f, pollinterval) + reactor.callLater(pollinterval, d.callback, None) + return d + + + def test_create(self): + ic = IntroducerClient(None, "introducer", "mypburl") + def _ignore(nodeid, rref): + pass + ic.notify_on_new_connection(_ignore) + + def test_listen(self): + i = Introducer() + i.setServiceParent(self.parent) + + def test_system(self): + + self.central_tub = tub = Tub() + #tub.setOption("logLocalFailures", True) + #tub.setOption("logRemoteFailures", True) + tub.setServiceParent(self.parent) + l = tub.listenOn("tcp:0") + portnum = l.getPortnum() + tub.setLocation("localhost:%d" % portnum) + + i = Introducer() + i.setServiceParent(self.parent) + iurl = tub.registerReference(i) + NUMCLIENTS = 5 + + self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS + d = self._done_counting = defer.Deferred() + def _count(nodeid, rref): + log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref)) + self.waiting_for_connections -= 1 + if self.waiting_for_connections == 0: + self._done_counting.callback("done!") + + clients = [] + tubs = {} + for i in range(NUMCLIENTS): + tub = Tub() + #tub.setOption("logLocalFailures", True) + #tub.setOption("logRemoteFailures", True) + tub.setServiceParent(self.parent) + l = tub.listenOn("tcp:0") + portnum = l.getPortnum() + tub.setLocation("localhost:%d" % portnum) + + n = MyNode() + node_pburl = tub.registerReference(n) + c = IntroducerClient(tub, iurl, node_pburl) + c.notify_on_new_connection(_count) + c.setServiceParent(self.parent) + clients.append(c) + tubs[c] = tub + + # d will fire once everybody is connected + + def _check(res): + log.msg("doing _check") + for c in clients: + self.failUnlessEqual(len(c.connections), NUMCLIENTS) + # now disconnect somebody's connection to someone else + self.waiting_for_connections = 2 + d2 = self._done_counting = defer.Deferred() + origin_c = clients[0] + # find a target that is not themselves + for nodeid,rref in origin_c.connections.items(): + if idlib.b2a(nodeid) != tubs[origin_c].tubID: + victim = rref + break + log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim)) + victim.tracker.broker.transport.loseConnection() + log.msg(" did disconnect") + return d2 + d.addCallback(_check) + def _check_again(res): + log.msg("doing _check_again") + for c in clients: + self.failUnlessEqual(len(c.connections), NUMCLIENTS) + # now disconnect somebody's connection to themselves. This will + # only result in one new connection, since it is a loopback. + self.waiting_for_connections = 1 + d2 = self._done_counting = defer.Deferred() + origin_c = clients[0] + # find a target that *is* themselves + for nodeid,rref in origin_c.connections.items(): + if idlib.b2a(nodeid) == tubs[origin_c].tubID: + victim = rref + break + log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim)) + victim.tracker.broker.transport.loseConnection() + log.msg(" did disconnect") + return d2 + d.addCallback(_check_again) + def _check_again2(res): + log.msg("doing _check_again2") + for c in clients: + self.failUnlessEqual(len(c.connections), NUMCLIENTS) + # now disconnect somebody's connection to themselves + d.addCallback(_check_again2) + return d + + def stall(self, res, timeout): + d = defer.Deferred() + reactor.callLater(timeout, d.callback, res) + return d + + def test_system_this_one_breaks(self): + # this uses a single Tub, which has a strong effect on the + # failingness + tub = Tub() + tub.setOption("logLocalFailures", True) + tub.setOption("logRemoteFailures", True) + tub.setServiceParent(self.parent) + l = tub.listenOn("tcp:0") + portnum = l.getPortnum() + tub.setLocation("localhost:%d" % portnum) + + i = Introducer() + i.setServiceParent(self.parent) + iurl = tub.registerReference(i) + + clients = [] + for i in range(5): + n = MyNode() + node_pburl = tub.registerReference(n) + c = IntroducerClient(tub, iurl, node_pburl) + c.setServiceParent(self.parent) + clients.append(c) + + # time passes.. + d = defer.Deferred() + def _check(res): + log.msg("doing _check") + self.failUnlessEqual(len(clients[0].connections), 5) + d.addCallback(_check) + reactor.callLater(2, d.callback, None) + return d + del test_system_this_one_breaks + + + def test_system_this_one_breaks_too(self): + # this one shuts down so quickly that it fails in a different way + self.central_tub = tub = Tub() + tub.setOption("logLocalFailures", True) + tub.setOption("logRemoteFailures", True) + tub.setServiceParent(self.parent) + l = tub.listenOn("tcp:0") + portnum = l.getPortnum() + tub.setLocation("localhost:%d" % portnum) + + i = Introducer() + i.setServiceParent(self.parent) + iurl = tub.registerReference(i) + + clients = [] + for i in range(5): + tub = Tub() + tub.setOption("logLocalFailures", True) + tub.setOption("logRemoteFailures", True) + tub.setServiceParent(self.parent) + l = tub.listenOn("tcp:0") + portnum = l.getPortnum() + tub.setLocation("localhost:%d" % portnum) + + n = MyNode() + node_pburl = tub.registerReference(n) + c = IntroducerClient(tub, iurl, node_pburl) + c.setServiceParent(self.parent) + clients.append(c) + + # time passes.. + d = defer.Deferred() + reactor.callLater(0.01, d.callback, None) + def _check(res): + log.msg("doing _check") + self.fail("BOOM") + for c in clients: + self.failUnlessEqual(len(c.connections), 5) + c.connections.values()[0].tracker.broker.transport.loseConnection() + return self.stall(None, 2) + d.addCallback(_check) + def _check_again(res): + log.msg("doing _check_again") + for c in clients: + self.failUnlessEqual(len(c.connections), 5) + d.addCallback(_check_again) + return d + del test_system_this_one_breaks_too + diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 62111b9c..709390ed 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -47,18 +47,18 @@ class SystemTest(unittest.TestCase): def _set_up_nodes_2(self, res): q = self.queen - self.queen_pburl = q.urls["roster"] + self.queen_furl = q.urls["introducer"] + self.vdrive_furl = q.urls["vdrive"] self.clients = [] for i in range(self.numclients): basedir = "client%d" % i if not os.path.isdir(basedir): os.mkdir(basedir) if i == 0: - f = open(os.path.join(basedir, "webport"), "w") - f.write("tcp:0:interface=127.0.0.1") - f.close() + open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1") + open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl) + open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl) c = self.add_service(client.Client(basedir=basedir)) - c.set_queen_pburl(self.queen_pburl) self.clients.append(c) log.msg("STARTING") d = self.wait_for_connections() @@ -76,9 +76,11 @@ class SystemTest(unittest.TestCase): basedir = "client%d" % client_num if not os.path.isdir(basedir): os.mkdir(basedir) + open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl) + open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl) + c = client.Client(basedir=basedir) self.clients.append(c) - c.set_queen_pburl(self.queen_pburl) self.numclients += 1 c.startService() d = self.wait_for_connections() @@ -87,7 +89,7 @@ class SystemTest(unittest.TestCase): def wait_for_connections(self, ignored=None): for c in self.clients: - if len(c.connections) != self.numclients: + if not c.introducer_client or len(c.get_all_peerids()) != self.numclients: d = defer.Deferred() d.addCallback(self.wait_for_connections) reactor.callLater(0.05, d.callback, None) @@ -96,18 +98,21 @@ class SystemTest(unittest.TestCase): def test_connections(self): d = self.set_up_nodes() + self.extra_node = None d.addCallback(lambda res: self.add_extra_node(5)) def _check(extra_node): self.extra_node = extra_node for c in self.clients: - self.failUnlessEqual(len(c.connections), 6) + self.failUnlessEqual(len(c.get_all_peerids()), 6) d.addCallback(_check) def _shutdown_extra_node(res): - d1 = self.extra_node.stopService() - d2 = defer.Deferred() - reactor.callLater(self.DISCONNECT_DELAY, d2.callback, res) - d1.addCallback(lambda ignored: d2) - return d1 + if self.extra_node: + d1 = self.extra_node.stopService() + d2 = defer.Deferred() + reactor.callLater(self.DISCONNECT_DELAY, d2.callback, res) + d1.addCallback(lambda ignored: d2) + return d1 + return res d.addBoth(_shutdown_extra_node) return d diff --git a/src/allmydata/web/welcome.xhtml b/src/allmydata/web/welcome.xhtml index d162a01f..5d8ad3b8 100644 --- a/src/allmydata/web/welcome.xhtml +++ b/src/allmydata/web/welcome.xhtml @@ -23,13 +23,9 @@ - - - -
PeerIDConnected?PBURL
no peers!
diff --git a/src/allmydata/webish.py b/src/allmydata/webish.py index 98f37161..ca86f61e 100644 --- a/src/allmydata/webish.py +++ b/src/allmydata/webish.py @@ -28,31 +28,29 @@ class Welcome(rend.Page): docFactory = getxmlfile("welcome.xhtml") def data_queen_pburl(self, ctx, data): - return IClient(ctx).queen_pburl + return IClient(ctx).introducer_furl def data_connected_to_queen(self, ctx, data): - if IClient(ctx).queen: + if IClient(ctx).connected_to_vdrive: return "yes" return "no" def data_num_peers(self, ctx, data): #client = inevow.ISite(ctx)._client client = IClient(ctx) - return len(client.connections) + return len(client.get_all_peerids()) def data_num_connected_peers(self, ctx, data): - return len(IClient(ctx).connections) + return len(IClient(ctx).get_all_peerids()) def data_peers(self, ctx, data): d = [] client = IClient(ctx) - for nodeid in sorted(client.connections.keys()): - row = (idlib.b2a(nodeid), "yes", "?") + for nodeid in sorted(client.get_all_peerids()): + row = (idlib.b2a(nodeid),) d.append(row) return d def render_row(self, ctx, data): - nodeid_a, connected, pburl = data + (nodeid_a,) = data ctx.fillSlots("peerid", nodeid_a) - ctx.fillSlots("connected", connected) - ctx.fillSlots("pburl", pburl) return ctx.tag # this is a form where users can download files by URI