]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
complete the Introducer changes, separate out vdrive access, make everything work...
authorBrian Warner <warner@allmydata.com>
Tue, 27 Mar 2007 23:12:11 +0000 (16:12 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 27 Mar 2007 23:12:11 +0000 (16:12 -0700)
13 files changed:
src/allmydata/client.py
src/allmydata/interfaces.py
src/allmydata/introducer.py
src/allmydata/introducerclient.py [deleted file]
src/allmydata/node.py
src/allmydata/queen.py
src/allmydata/scripts/runner.py
src/allmydata/test/check_memory.py
src/allmydata/test/test_client.py
src/allmydata/test/test_introducer.py [new file with mode: 0644]
src/allmydata/test/test_system.py
src/allmydata/web/welcome.xhtml
src/allmydata/webish.py

index b063805e55a7d29a4cc86e08d78c70ad03cab2b4..72d96b24e0400d842b00029dce88aa03bb706701 100644 (file)
@@ -1,21 +1,19 @@
 
 import os, sha
 from foolscap import Referenceable
-from twisted.application import service
-from twisted.python import log
 from zope.interface import implements
 from allmydata.interfaces import RIClient
 from allmydata import node
 
 from twisted.internet import defer
 
-from allmydata.util import idlib
 from allmydata.storageserver import StorageServer
 from allmydata.upload import Uploader
 from allmydata.download import Downloader
 from allmydata.vdrive import VDrive
 from allmydata.webish import WebishServer
 from allmydata.control import ControlServer
+from allmydata.introducer import IntroducerClient
 
 class Client(node.Node, Referenceable):
     implements(RIClient)
@@ -24,12 +22,14 @@ class Client(node.Node, Referenceable):
     STOREDIR = 'storage'
     NODETYPE = "client"
     WEBPORTFILE = "webport"
-    QUEEN_PBURL_FILE = "roster_pburl"
+    INTRODUCER_FURL_FILE = "introducer.furl"
+    GLOBAL_VDRIVE_FURL_FILE = "vdrive.furl"
 
     def __init__(self, basedir="."):
         node.Node.__init__(self, basedir)
-        self.queen = None # self.queen is either None or a RemoteReference
+        self.my_pburl = None
         self.introducer_client = None
+        self.connected_to_vdrive = False
         self.add_service(StorageServer(os.path.join(basedir, self.STOREDIR)))
         self.add_service(Uploader())
         self.add_service(Downloader())
@@ -40,37 +40,31 @@ class Client(node.Node, Referenceable):
             webport = f.read() # strports string
             f.close()
             self.add_service(WebishServer(webport))
-        self.queen_pburl = None
-        QUEEN_PBURL_FILE = os.path.join(self.basedir, self.QUEEN_PBURL_FILE)
-        if os.path.exists(QUEEN_PBURL_FILE):
-            f = open(QUEEN_PBURL_FILE, "r")
-            self.queen_pburl = f.read().strip()
-            f.close()
-        self.queen_connector = None
+
+        INTRODUCER_FURL_FILE = os.path.join(self.basedir,
+                                            self.INTRODUCER_FURL_FILE)
+        f = open(INTRODUCER_FURL_FILE, "r")
+        self.introducer_furl = f.read().strip()
+        f.close()
+
+        GLOBAL_VDRIVE_FURL_FILE = os.path.join(self.basedir,
+                                               self.GLOBAL_VDRIVE_FURL_FILE)
+        f = open(GLOBAL_VDRIVE_FURL_FILE, "r")
+        self.global_vdrive_furl = f.read().strip()
+        f.close()
 
     def tub_ready(self):
+        self.log("tub_ready")
         self.my_pburl = self.tub.registerReference(self)
-        if self.queen_pburl:
-            self.introducer_client = IntroducerClient(self.tub, self.queen_pburl, self.my_pburl)
+
+        ic = IntroducerClient(self.tub, self.introducer_furl, self.my_pburl)
+        self.introducer_client = ic
+        ic.setServiceParent(self)
+
         self.register_control()
-        self.maybe_connect_to_queen()
-
-    def set_queen_pburl(self, queen_pburl):
-        self.queen_pburl = queen_pburl
-        self.maybe_connect_to_queen()
-
-    def maybe_connect_to_queen(self):
-        if not self.running:
-            return
-        if not self.my_pburl:
-            return
-        if self.queen_connector:
-            return
-        if not self.queen_pburl:
-            self.log("no queen_pburl, cannot connect")
-            return
-        self.queen_connector = self.tub.connectTo(self.queen_pburl,
-                                                  self._got_queen)
+
+        self.vdrive_connector = self.tub.connectTo(self.global_vdrive_furl,
+                                                   self._got_vdrive)
 
     def register_control(self):
         c = ControlServer()
@@ -81,38 +75,37 @@ class Client(node.Node, Referenceable):
         f.close()
         os.chmod("control.pburl", 0600)
 
-    def stopService(self):
-        if self.introducer_client:
-            self.introducer_client.stop()
-        return service.MultiService.stopService(self)
-
-    def _got_queen(self, queen):
-        self.log("connected to queen")
-        d.addCallback(lambda x: queen.callRemote("get_global_vdrive"))
-        d.addCallback(self._got_vdrive_root)
-
-    def _got_vdrive_root(self, root):
-        self.getServiceNamed("vdrive").set_root(root)
+    def _got_vdrive(self, vdrive_root):
+        # vdrive_root implements RIMutableDirectoryNode
+        self.log("connected to vdrive")
+        self.connected_to_vdrive = True
+        self.getServiceNamed("vdrive").set_root(vdrive_root)
         if "webish" in self.namedServices:
-            self.getServiceNamed("webish").set_root_dirnode(root)
+            self.getServiceNamed("webish").set_root_dirnode(vdrive_root)
+        def _disconnected():
+            self.connected_to_vdrive = False
+        vdrive_root.notifyOnDisconnect(_disconnected)
 
     def remote_get_service(self, name):
         # TODO: 'vdrive' should not be public in the medium term
         return self.getServiceNamed(name)
 
     def get_remote_service(self, nodeid, servicename):
-        if nodeid not in self.connections:
+        if nodeid not in self.introducer_client.connections:
             return defer.fail(IndexError("no connection to that peer"))
-        peer = self.connections[nodeid]
+        peer = self.introducer_client.connections[nodeid]
         d = peer.callRemote("get_service", name=servicename)
         return d
 
 
+    def get_all_peerids(self):
+        return self.introducer_client.connections.iterkeys()
+
     def permute_peerids(self, key, max_count=None):
         # TODO: eventually reduce memory consumption by doing an insertion
         # sort of at most max_count elements
         results = []
-        for nodeid in self.all_peers:
+        for nodeid in self.get_all_peerids():
             assert isinstance(nodeid, str)
             permuted = sha.new(key + nodeid).digest()
             results.append((permuted, nodeid))
index 67bf189e47a5f8fe2ecd68729f69d6c011d1ca2d..d6bd99421248fcd9cf531e9530d3e375114ea2ad 100644 (file)
@@ -1,6 +1,6 @@
 
 from zope.interface import Interface
-from foolscap.schema import StringConstraint, ListOf, SetOf, TupleOf, Any
+from foolscap.schema import StringConstraint, ListOf, TupleOf, Any
 from foolscap import RemoteInterface
 
 Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash
@@ -16,6 +16,8 @@ RIBucketWriter_ = Any()
 RIBucketReader_ = Any()
 RIMutableDirectoryNode_ = Any()
 RIMutableFileNode_ = Any()
+def SetOf(*args, **kwargs): return Any()
+def DictOf(*args, **kwargs): return Any()
 
 class RIIntroducerClient(RemoteInterface):
     def new_peers(pburls=SetOf(PBURL)):
@@ -25,11 +27,7 @@ class RIIntroducer(RemoteInterface):
     def hello(node=RIIntroducerClient, pburl=PBURL):
         return None
 
-class RIQueenRoster(RemoteInterface):
-    def get_global_vdrive():
-        return RIMutableDirectoryNode_ # the virtual drive root
-
-class RIClient(RIIntroducerClient):
+class RIClient(RemoteInterface):
     def get_service(name=str):
         return Referenceable_
     def get_nodeid():
index a0afef1921bfc3dde316f3bb9d172284c5c1ab2f..08520d855bfe6f13985407bdb7a212b856ab53c2 100644 (file)
@@ -1,16 +1,11 @@
-from foolscap import Referenceable, DeadReferenceError
+
+import re
+from zope.interface import implements
 from twisted.application import service
 from twisted.python import log
-from twisted.internet.error import ConnectionLost, ConnectionDone
-from zope.interface import implements
-from allmydata.interfaces import RIIntroducer
-
-
-def sendOnly(call, methname, *args, **kwargs):
-    d = call(methname, *args, **kwargs)
-    def _trap(f):
-        f.trap(DeadReferenceError, ConnectionLost, ConnectionDone)
-    d.addErrback(_trap)
+from foolscap import Referenceable
+from allmydata.interfaces import RIIntroducer, RIIntroducerClient
+from allmydata.util import idlib, observer
 
 class Introducer(service.MultiService, Referenceable):
     implements(RIIntroducer)
@@ -21,9 +16,9 @@ class Introducer(service.MultiService, Referenceable):
         self.pburls = set()
 
     def remote_hello(self, node, pburl):
-        log.msg("roster: new contact at %s, node is %s" % (pburl, node))
+        log.msg("introducer: new contact at %s, node is %s" % (pburl, node))
         def _remove():
-            log.msg(" roster: removing %s %s" % (node, pburl))
+            log.msg(" introducer: removing %s %s" % (node, pburl))
             self.nodes.remove(node)
             self.pburls.remove(pburl)
         node.notifyOnDisconnect(_remove)
@@ -32,3 +27,64 @@ class Introducer(service.MultiService, Referenceable):
         for othernode in self.nodes:
             othernode.callRemote("new_peers", set([pburl]))
         self.nodes.add(node)
+
+
+class IntroducerClient(service.Service, Referenceable):
+    implements(RIIntroducerClient)
+
+    def __init__(self, tub, introducer_pburl, my_pburl):
+        self.tub = tub
+        self.introducer_pburl = introducer_pburl
+        self.my_pburl = my_pburl
+
+        self.connections = {} # k: nodeid, v: ref
+        self.reconnectors = {} # k: PBURL, v: reconnector
+
+        self.connection_observers = observer.ObserverList()
+
+    def startService(self):
+        self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl,
+                                                         self._got_introducer)
+
+    def log(self, msg):
+        self.parent.log(msg)
+
+    def remote_new_peers(self, pburls):
+        for pburl in pburls:
+            self._new_peer(pburl)
+
+    def stopService(self):
+        service.Service.stopService(self)
+        self.introducer_reconnector.stopConnecting()
+        for reconnector in self.reconnectors.itervalues():
+            reconnector.stopConnecting()
+
+    def _new_peer(self, pburl):
+        if pburl in self.reconnectors:
+            return
+        m = re.match(r'pb://(\w+)@', pburl)
+        assert m
+        nodeid = idlib.a2b(m.group(1))
+        def _got_peer(rref):
+            self.log(" connected to(%s)" % idlib.b2a(nodeid))
+            self.connection_observers.notify(nodeid, rref)
+            self.connections[nodeid] = rref
+            def _lost():
+                # TODO: notifyOnDisconnect uses eventually(), but connects do
+                # not. Could this cause a problem?
+                del self.connections[nodeid]
+            rref.notifyOnDisconnect(_lost)
+        self.log(" connecting to(%s)" % pburl)
+        self.reconnectors[pburl] = self.tub.connectTo(pburl, _got_peer)
+
+    def _got_introducer(self, introducer):
+        self.log(" introducing ourselves: %s, %s" % (self, self.my_pburl))
+        d = introducer.callRemote("hello",
+                             node=self,
+                             pburl=self.my_pburl)
+
+    def notify_on_new_connection(self, cb):
+        """Register a callback that will be fired (with nodeid, rref) when
+        a new connection is established."""
+        self.connection_observers.subscribe(cb)
+
diff --git a/src/allmydata/introducerclient.py b/src/allmydata/introducerclient.py
deleted file mode 100644 (file)
index 4c19354..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-class IntroducerClient(Referenceable):
-    implements(RIIntroducerClient)
-
-    def __init__(self, tub, introducer_pburl, my_pburl):
-        self.introducer_reconnector = self.tub.connectTo(introducer_pburl,
-                                                  self._got_introducer)
-
-        self.tub = tub
-        self.my_pburl = my_pburl
-
-        self.connections = {} # k: nodeid, v: ref
-        self.reconnectors = {} # k: PBURL, v: reconnector
-
-    def remote_get_nodeid(self):
-        return self.nodeid
-
-    def remote_new_peers(self, pburls):
-        for pburl in pburls:
-            self._new_peer(pburl)
-
-    def stop(self):
-        self.introducer_reconnector.stopConnecting()
-        for reconnector in self.reconnectors.itervalues():
-            reconnector.stopConnecting()
-
-    def _new_peer(self, pburl):
-        if pburl in self.reconnectors:
-            return
-        def _got_peer(rref):
-            d2 = rref.callRemote("get_nodeid")
-            def _got_nodeid(nodeid):
-                self.connections[nodeid] = rref
-                def _lost():
-                    # TODO: notifyOnDisconnect uses eventually(), but connects do not. Could this cause a problem?
-                    del self.connections[nodeid]
-                rref.notifyOnDisconnect(_lost)
-            d2.addCallback(_got_nodeid)
-        log.msg(" connecting to(%s)" % pburl)
-        self.reconnectors[pburl] = self.tub.connectTo(pburl, _got_peer)
-
-    def _got_introducer(self, introducer):
-        log.msg(" introducing ourselves: %s, %s" % (self, self.my_pburl))
-        d = introducer.callRemote("hello",
-                             node=self,
-                             pburl=self.my_pburl)
index 04cabf9ed68eca891511ece9e02359f6c3f600cb..ab31f9cc116b55cffaaef2abe05dc947bcb249f7 100644 (file)
@@ -30,6 +30,9 @@ class Node(service.MultiService):
             f = open(certfile, "wb")
             f.write(self.tub.getCertData())
             f.close()
+        if False: # TODO: once foolscap-0.1.1 is released, enable this
+            self.tub.setOption("logLocalFailures", True)
+            self.tub.setOption("logRemoteFailures", True)
         self.nodeid = idlib.a2b(self.tub.tubID)
         f = open(os.path.join(self.basedir, self.NODEIDFILE), "w")
         f.write(idlib.b2a(self.nodeid) + "\n")
index 18192c44b599e6356a539d06db2a8e7454aa2409..dd10b352fffc69f0b632dbec363a6cdf37e0fc5d 100644 (file)
@@ -1,35 +1,8 @@
 
 import os.path
-from foolscap import Referenceable, DeadReferenceError
-from foolscap.eventual import eventually
-from twisted.application import service
-from twisted.python import log
-from twisted.internet.error import ConnectionLost, ConnectionDone
-from allmydata.util import idlib
-from zope.interface import implements
-from allmydata.interfaces import RIQueenRoster, RIIntroducer
 from allmydata import node
 from allmydata.filetable import GlobalVirtualDrive
-
-
-def sendOnly(call, methname, *args, **kwargs):
-    d = call(methname, *args, **kwargs)
-    def _trap(f):
-        f.trap(DeadReferenceError, ConnectionLost, ConnectionDone)
-    d.addErrback(_trap)
-
-class Roster(service.MultiService, Referenceable):
-    implements(RIQueenRoster)
-
-    def __init__(self):
-        self.gvd_root = None
-
-    def set_gvd_root(self, root):
-        self.gvd_root = root
-
-    def remote_get_global_vdrive(self):
-        return self.gvd_root
-
+from allmydata.introducer import Introducer
 
 
 class Queen(node.Node):
@@ -39,15 +12,21 @@ class Queen(node.Node):
 
     def __init__(self, basedir="."):
         node.Node.__init__(self, basedir)
-        self.gvd = self.add_service(GlobalVirtualDrive(basedir))
         self.urls = {}
 
     def tub_ready(self):
-        r = self.add_service(Roster())
-        self.urls["roster"] = self.tub.registerReference(r, "roster")
-        self.log(" roster is at %s" % self.urls["roster"])
-        f = open(os.path.join(self.basedir, "roster_pburl"), "w")
-        f.write(self.urls["roster"] + "\n")
+        r = self.add_service(Introducer())
+        self.urls["introducer"] = self.tub.registerReference(r, "introducer")
+        self.log(" introducer is at %s" % self.urls["introducer"])
+        f = open(os.path.join(self.basedir, "introducer.furl"), "w")
+        f.write(self.urls["introducer"] + "\n")
+        f.close()
+
+        gvd = self.add_service(GlobalVirtualDrive(self.basedir))
+        self.urls["vdrive"] = self.tub.registerReference(gvd.get_root(),
+                                                         "vdrive")
+        self.log(" vdrive is at %s" % self.urls["vdrive"])
+        f = open(os.path.join(self.basedir, "vdrive.furl"), "w")
+        f.write(self.urls["vdrive"] + "\n")
         f.close()
-        r.set_gvd_root(self.gvd.get_root())
 
index ecff320198c921e8d53b8fd95880781754b35b28..c4fe8edb6aede9099572bf22106433cc3611e053 100644 (file)
@@ -122,7 +122,8 @@ def create_client(config):
     f = open(os.path.join(basedir, "client.tac"), "w")
     f.write(client_tac)
     f.close()
-    print "client created in %s, please copy roster_pburl into the directory" % basedir
+    print "client created in %s" % basedir
+    print " please copy introducer.furl and vdrive.furl into the directory"
 
 def create_queen(config):
     basedir = config['basedir']
index 0e99ec4ebcef0fe49d7bbedff44e9eac0b3a5447..2a3d2452a85ab2d58fbdf9a27caacbda445d0092 100644 (file)
@@ -65,13 +65,19 @@ class SystemFramework:
 
     def make_nodes(self):
         q = self.queen
-        self.queen_pburl = q.urls["roster"]
+        self.queen_pburl = q.urls["introducer"]
+        vdrive_furl = q.urls["vdrive"]
         self.nodes = []
         for i in range(self.numnodes):
             nodedir = os.path.join(self.basedir, "node%d" % i)
             os.mkdir(nodedir)
+            f = open(os.path.join(nodedir, "introducer.furl"), "w")
+            f.write(self.queen_pburl)
+            f.close()
+            f = open(os.path.join(nodedir, "vdrive.furl"), "w")
+            f.write(vdrive_furl)
+            f.close()
             c = self.add_service(client.Client(basedir=nodedir))
-            c.set_queen_pburl(self.queen_pburl)
             self.nodes.append(c)
         # the peers will start running, eventually they will connect to each
         # other and the queen
@@ -81,7 +87,7 @@ class SystemFramework:
         f.write("If the node notices this file at startup, it will poll and\n")
         f.write("terminate as soon as the file goes away. This prevents\n")
         f.write("leaving processes around if the test harness has an\n")
-        f.write("internal failure and neglects to kil off the node\n")
+        f.write("internal failure and neglects to kill off the node\n")
         f.write("itself. The contents of this file are ignored.\n")
         f.close()
 
@@ -91,7 +97,7 @@ class SystemFramework:
         config = {'basedir': clientdir}
         runner.create_client(config)
         log.msg("DONE MAKING CLIENT")
-        f = open(os.path.join(clientdir, "roster_pburl"), "w")
+        f = open(os.path.join(clientdir, "introducer.furl"), "w")
         f.write(self.queen_pburl + "\n")
         f.close()
         self.keepalive_file = os.path.join(clientdir, "suicide_prevention_hotline")
index b3dbd7f7110f930fd5daed9a8672d5ec1e7750b0..a0ce5f2aa346c9db332117938104fb53d4eb0437 100644 (file)
@@ -1,17 +1,31 @@
 
+import os
 from twisted.trial import unittest
 
 from allmydata import client
 
+class MyClient(client.Client):
+    def __init__(self, basedir):
+        self.connections = {}
+        client.Client.__init__(self, basedir)
+
+    def get_all_peerids(self):
+        return self.connections
+
 class Basic(unittest.TestCase):
     def test_loadable(self):
-        c = client.Client("")
-        d = c.startService()
-        d.addCallback(lambda res: c.stopService())
-        return d
+        basedir = "test_client.Basic.test_loadable"
+        os.mkdir(basedir)
+        open(os.path.join(basedir, "introducer.furl"), "w").write("")
+        open(os.path.join(basedir, "vdrive.furl"), "w").write("")
+        c = client.Client(basedir)
 
     def test_permute(self):
-        c = client.Client("")
+        basedir = "test_client.Basic.test_permute"
+        os.mkdir(basedir)
+        open(os.path.join(basedir, "introducer.furl"), "w").write("")
+        open(os.path.join(basedir, "vdrive.furl"), "w").write("")
+        c = MyClient(basedir)
         for k in ["%d" % i for i in range(5)]:
             c.connections[k] = None
         self.failUnlessEqual(c.permute_peerids("one"), ['3','1','0','4','2'])
@@ -20,7 +34,7 @@ class Basic(unittest.TestCase):
         c.connections.clear()
         self.failUnlessEqual(c.permute_peerids("one"), [])
 
-        c2 = client.Client("")
+        c2 = MyClient(basedir)
         for k in ["%d" % i for i in range(5)]:
             c2.connections[k] = None
         self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2'])
diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py
new file mode 100644 (file)
index 0000000..7d7ce0e
--- /dev/null
@@ -0,0 +1,234 @@
+
+from twisted.trial import unittest
+from twisted.internet import defer, reactor
+from twisted.python import log
+defer.setDebugging(True)
+
+from foolscap import Tub, Referenceable
+from twisted.application import service
+from allmydata.introducer import IntroducerClient, Introducer
+from allmydata.util import idlib
+
+class MyNode(Referenceable):
+    pass
+
+class LoggingMultiService(service.MultiService):
+    def log(self, msg):
+        pass
+
+class TestIntroducer(unittest.TestCase):
+    def setUp(self):
+        self.parent = LoggingMultiService()
+        self.parent.startService()
+    def tearDown(self):
+        log.msg("TestIntroducer.tearDown")
+        d = defer.Deferred()
+        reactor.callLater(1.1, d.callback, None)
+        d.addCallback(lambda res: self.parent.stopService())
+        return d
+
+
+
+    def poll(self, check_f, pollinterval=0.01):
+        # Return a Deferred, then call check_f periodically until it returns
+        # True, at which point the Deferred will fire.. If check_f raises an
+        # exception, the Deferred will errback.
+        d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
+        return d
+
+    def _poll(self, res, check_f, pollinterval):
+        if check_f():
+            return True
+        d = defer.Deferred()
+        d.addCallback(self._poll, check_f, pollinterval)
+        reactor.callLater(pollinterval, d.callback, None)
+        return d
+
+
+    def test_create(self):
+        ic = IntroducerClient(None, "introducer", "mypburl")
+        def _ignore(nodeid, rref):
+            pass
+        ic.notify_on_new_connection(_ignore)
+
+    def test_listen(self):
+        i = Introducer()
+        i.setServiceParent(self.parent)
+
+    def test_system(self):
+
+        self.central_tub = tub = Tub()
+        #tub.setOption("logLocalFailures", True)
+        #tub.setOption("logRemoteFailures", True)
+        tub.setServiceParent(self.parent)
+        l = tub.listenOn("tcp:0")
+        portnum = l.getPortnum()
+        tub.setLocation("localhost:%d" % portnum)
+
+        i = Introducer()
+        i.setServiceParent(self.parent)
+        iurl = tub.registerReference(i)
+        NUMCLIENTS = 5
+
+        self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
+        d = self._done_counting = defer.Deferred()
+        def _count(nodeid, rref):
+            log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
+            self.waiting_for_connections -= 1
+            if self.waiting_for_connections == 0:
+                self._done_counting.callback("done!")
+
+        clients = []
+        tubs = {}
+        for i in range(NUMCLIENTS):
+            tub = Tub()
+            #tub.setOption("logLocalFailures", True)
+            #tub.setOption("logRemoteFailures", True)
+            tub.setServiceParent(self.parent)
+            l = tub.listenOn("tcp:0")
+            portnum = l.getPortnum()
+            tub.setLocation("localhost:%d" % portnum)
+
+            n = MyNode()
+            node_pburl = tub.registerReference(n)
+            c = IntroducerClient(tub, iurl, node_pburl)
+            c.notify_on_new_connection(_count)
+            c.setServiceParent(self.parent)
+            clients.append(c)
+            tubs[c] = tub
+
+        # d will fire once everybody is connected
+
+        def _check(res):
+            log.msg("doing _check")
+            for c in clients:
+                self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+            # now disconnect somebody's connection to someone else
+            self.waiting_for_connections = 2
+            d2 = self._done_counting = defer.Deferred()
+            origin_c = clients[0]
+            # find a target that is not themselves
+            for nodeid,rref in origin_c.connections.items():
+                if idlib.b2a(nodeid) != tubs[origin_c].tubID:
+                    victim = rref
+                    break
+            log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
+            victim.tracker.broker.transport.loseConnection()
+            log.msg(" did disconnect")
+            return d2
+        d.addCallback(_check)
+        def _check_again(res):
+            log.msg("doing _check_again")
+            for c in clients:
+                self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+            # now disconnect somebody's connection to themselves. This will
+            # only result in one new connection, since it is a loopback.
+            self.waiting_for_connections = 1
+            d2 = self._done_counting = defer.Deferred()
+            origin_c = clients[0]
+            # find a target that *is* themselves
+            for nodeid,rref in origin_c.connections.items():
+                if idlib.b2a(nodeid) == tubs[origin_c].tubID:
+                    victim = rref
+                    break
+            log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
+            victim.tracker.broker.transport.loseConnection()
+            log.msg(" did disconnect")
+            return d2
+        d.addCallback(_check_again)
+        def _check_again2(res):
+            log.msg("doing _check_again2")
+            for c in clients:
+                self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+            # now disconnect somebody's connection to themselves
+        d.addCallback(_check_again2)
+        return d
+
+    def stall(self, res, timeout):
+        d = defer.Deferred()
+        reactor.callLater(timeout, d.callback, res)
+        return d
+
+    def test_system_this_one_breaks(self):
+        # this uses a single Tub, which has a strong effect on the
+        # failingness
+        tub = Tub()
+        tub.setOption("logLocalFailures", True)
+        tub.setOption("logRemoteFailures", True)
+        tub.setServiceParent(self.parent)
+        l = tub.listenOn("tcp:0")
+        portnum = l.getPortnum()
+        tub.setLocation("localhost:%d" % portnum)
+
+        i = Introducer()
+        i.setServiceParent(self.parent)
+        iurl = tub.registerReference(i)
+
+        clients = []
+        for i in range(5):
+            n = MyNode()
+            node_pburl = tub.registerReference(n)
+            c = IntroducerClient(tub, iurl, node_pburl)
+            c.setServiceParent(self.parent)
+            clients.append(c)
+
+        # time passes..
+        d = defer.Deferred()
+        def _check(res):
+            log.msg("doing _check")
+            self.failUnlessEqual(len(clients[0].connections), 5)
+        d.addCallback(_check)
+        reactor.callLater(2, d.callback, None)
+        return d
+    del test_system_this_one_breaks
+
+
+    def test_system_this_one_breaks_too(self):
+        # this one shuts down so quickly that it fails in a different way
+        self.central_tub = tub = Tub()
+        tub.setOption("logLocalFailures", True)
+        tub.setOption("logRemoteFailures", True)
+        tub.setServiceParent(self.parent)
+        l = tub.listenOn("tcp:0")
+        portnum = l.getPortnum()
+        tub.setLocation("localhost:%d" % portnum)
+
+        i = Introducer()
+        i.setServiceParent(self.parent)
+        iurl = tub.registerReference(i)
+
+        clients = []
+        for i in range(5):
+            tub = Tub()
+            tub.setOption("logLocalFailures", True)
+            tub.setOption("logRemoteFailures", True)
+            tub.setServiceParent(self.parent)
+            l = tub.listenOn("tcp:0")
+            portnum = l.getPortnum()
+            tub.setLocation("localhost:%d" % portnum)
+
+            n = MyNode()
+            node_pburl = tub.registerReference(n)
+            c = IntroducerClient(tub, iurl, node_pburl)
+            c.setServiceParent(self.parent)
+            clients.append(c)
+
+        # time passes..
+        d = defer.Deferred()
+        reactor.callLater(0.01, d.callback, None)
+        def _check(res):
+            log.msg("doing _check")
+            self.fail("BOOM")
+            for c in clients:
+                self.failUnlessEqual(len(c.connections), 5)
+            c.connections.values()[0].tracker.broker.transport.loseConnection()
+            return self.stall(None, 2)
+        d.addCallback(_check)
+        def _check_again(res):
+            log.msg("doing _check_again")
+            for c in clients:
+                self.failUnlessEqual(len(c.connections), 5)
+        d.addCallback(_check_again)
+        return d
+    del test_system_this_one_breaks_too
+
index 62111b9c130466334dce3e6ea9c188b0bf53029a..709390ed6787e82010f38fe076bd1b5bd8709c03 100644 (file)
@@ -47,18 +47,18 @@ class SystemTest(unittest.TestCase):
 
     def _set_up_nodes_2(self, res):
         q = self.queen
-        self.queen_pburl = q.urls["roster"]
+        self.queen_furl = q.urls["introducer"]
+        self.vdrive_furl = q.urls["vdrive"]
         self.clients = []
         for i in range(self.numclients):
             basedir = "client%d" % i
             if not os.path.isdir(basedir):
                 os.mkdir(basedir)
             if i == 0:
-                f = open(os.path.join(basedir, "webport"), "w")
-                f.write("tcp:0:interface=127.0.0.1")
-                f.close()
+                open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
+            open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl)
+            open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl)
             c = self.add_service(client.Client(basedir=basedir))
-            c.set_queen_pburl(self.queen_pburl)
             self.clients.append(c)
         log.msg("STARTING")
         d = self.wait_for_connections()
@@ -76,9 +76,11 @@ class SystemTest(unittest.TestCase):
         basedir = "client%d" % client_num
         if not os.path.isdir(basedir):
             os.mkdir(basedir)
+        open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl)
+        open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl)
+
         c = client.Client(basedir=basedir)
         self.clients.append(c)
-        c.set_queen_pburl(self.queen_pburl)
         self.numclients += 1
         c.startService()
         d = self.wait_for_connections()
@@ -87,7 +89,7 @@ class SystemTest(unittest.TestCase):
 
     def wait_for_connections(self, ignored=None):
         for c in self.clients:
-            if len(c.connections) != self.numclients:
+            if not c.introducer_client or len(c.get_all_peerids()) != self.numclients:
                 d = defer.Deferred()
                 d.addCallback(self.wait_for_connections)
                 reactor.callLater(0.05, d.callback, None)
@@ -96,18 +98,21 @@ class SystemTest(unittest.TestCase):
 
     def test_connections(self):
         d = self.set_up_nodes()
+        self.extra_node = None
         d.addCallback(lambda res: self.add_extra_node(5))
         def _check(extra_node):
             self.extra_node = extra_node
             for c in self.clients:
-                self.failUnlessEqual(len(c.connections), 6)
+                self.failUnlessEqual(len(c.get_all_peerids()), 6)
         d.addCallback(_check)
         def _shutdown_extra_node(res):
-            d1 = self.extra_node.stopService()
-            d2 = defer.Deferred()
-            reactor.callLater(self.DISCONNECT_DELAY, d2.callback, res)
-            d1.addCallback(lambda ignored: d2)
-            return d1
+            if self.extra_node:
+                d1 = self.extra_node.stopService()
+                d2 = defer.Deferred()
+                reactor.callLater(self.DISCONNECT_DELAY, d2.callback, res)
+                d1.addCallback(lambda ignored: d2)
+                return d1
+            return res
         d.addBoth(_shutdown_extra_node)
         return d
 
index d162a01fc0952711b14b2c5448e4aac306f006d4..5d8ad3b8d57d3a5c1e65f1f965111be68e552ea2 100644 (file)
 <table n:render="sequence" n:data="peers" border="1">
   <tr n:pattern="header">
     <td>PeerID</td>
-    <td>Connected?</td>
-    <td>PBURL</td>
   </tr>
   <tr n:pattern="item" n:render="row">
     <td><tt><n:slot name="peerid"/></tt></td>
-    <td><n:slot name="connected"/></td>
-    <td><n:slot name="pburl"/></td>
   </tr>
   <tr n:pattern="empty"><td>no peers!</td></tr>
 </table>
index 98f37161aa75d26a5dc7c1c6cbb8a5354ac82de3..ca86f61eb78bd69fe542568b68daaed4005878b6 100644 (file)
@@ -28,31 +28,29 @@ class Welcome(rend.Page):
     docFactory = getxmlfile("welcome.xhtml")
 
     def data_queen_pburl(self, ctx, data):
-        return IClient(ctx).queen_pburl
+        return IClient(ctx).introducer_furl
     def data_connected_to_queen(self, ctx, data):
-        if IClient(ctx).queen:
+        if IClient(ctx).connected_to_vdrive:
             return "yes"
         return "no"
     def data_num_peers(self, ctx, data):
         #client = inevow.ISite(ctx)._client
         client = IClient(ctx)
-        return len(client.connections)
+        return len(client.get_all_peerids())
     def data_num_connected_peers(self, ctx, data):
-        return len(IClient(ctx).connections)
+        return len(IClient(ctx).get_all_peerids())
 
     def data_peers(self, ctx, data):
         d = []
         client = IClient(ctx)
-        for nodeid in sorted(client.connections.keys()):
-            row = (idlib.b2a(nodeid), "yes", "?")
+        for nodeid in sorted(client.get_all_peerids()):
+            row = (idlib.b2a(nodeid),)
             d.append(row)
         return d
 
     def render_row(self, ctx, data):
-        nodeid_a, connected, pburl = data
+        (nodeid_a,) = data
         ctx.fillSlots("peerid", nodeid_a)
-        ctx.fillSlots("connected", connected)
-        ctx.fillSlots("pburl", pburl)
         return ctx.tag
 
     # this is a form where users can download files by URI