1 from base64 import b32encode
3 from twisted.trial import unittest
4 from twisted.internet import defer, reactor
5 from twisted.python import log
7 from foolscap import Tub, Referenceable
8 from foolscap.eventual import flushEventualQueue
9 from twisted.application import service
10 from allmydata.introducer import IntroducerClient, Introducer
11 from allmydata.util import testutil
13 class MyNode(Referenceable):
16 class LoggingMultiService(service.MultiService):
20 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
22 self.parent = LoggingMultiService()
23 self.parent.startService()
25 log.msg("TestIntroducer.tearDown")
26 d = defer.succeed(None)
27 d.addCallback(lambda res: self.parent.stopService())
28 d.addCallback(flushEventualQueue)
32 def test_create(self):
33 ic = IntroducerClient(None, "introducer", "myfurl")
34 def _ignore(nodeid, rref):
36 ic.notify_on_new_connection(_ignore)
38 def test_listen(self):
40 i.setServiceParent(self.parent)
42 def test_system(self):
44 self.central_tub = tub = Tub()
45 #tub.setOption("logLocalFailures", True)
46 #tub.setOption("logRemoteFailures", True)
47 tub.setServiceParent(self.parent)
48 l = tub.listenOn("tcp:0")
49 portnum = l.getPortnum()
50 tub.setLocation("localhost:%d" % portnum)
53 i.setServiceParent(self.parent)
54 iurl = tub.registerReference(i)
57 self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
58 d = self._done_counting = defer.Deferred()
59 def _count(nodeid, rref):
60 log.msg("NEW CONNECTION! %s %s" % (b32encode(nodeid).lower(), rref))
61 self.waiting_for_connections -= 1
62 if self.waiting_for_connections == 0:
63 self._done_counting.callback("done!")
67 for i in range(NUMCLIENTS):
69 #tub.setOption("logLocalFailures", True)
70 #tub.setOption("logRemoteFailures", True)
71 tub.setServiceParent(self.parent)
72 l = tub.listenOn("tcp:0")
73 portnum = l.getPortnum()
74 tub.setLocation("localhost:%d" % portnum)
77 node_furl = tub.registerReference(n)
78 c = IntroducerClient(tub, iurl, node_furl)
79 c.notify_on_new_connection(_count)
80 c.setServiceParent(self.parent)
84 # d will fire once everybody is connected
87 log.msg("doing _check")
89 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
90 # now disconnect somebody's connection to someone else
91 self.waiting_for_connections = 2
92 d2 = self._done_counting = defer.Deferred()
94 # find a target that is not themselves
95 for nodeid,rref in origin_c.connections.items():
96 if b32encode(nodeid).lower() != tubs[origin_c].tubID:
99 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
100 victim.tracker.broker.transport.loseConnection()
101 log.msg(" did disconnect")
103 d.addCallback(_check)
104 def _check_again(res):
105 log.msg("doing _check_again")
107 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
108 # now disconnect somebody's connection to themselves. This will
109 # only result in one new connection, since it is a loopback.
110 self.waiting_for_connections = 1
111 d2 = self._done_counting = defer.Deferred()
112 origin_c = clients[0]
113 # find a target that *is* themselves
114 for nodeid,rref in origin_c.connections.items():
115 if b32encode(nodeid).lower() == tubs[origin_c].tubID:
118 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
119 victim.tracker.broker.transport.loseConnection()
120 log.msg(" did disconnect")
122 d.addCallback(_check_again)
123 def _check_again2(res):
124 log.msg("doing _check_again2")
126 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
127 # now disconnect somebody's connection to themselves
128 d.addCallback(_check_again2)
130 test_system.timeout = 2400
132 def stall(self, res, timeout):
134 reactor.callLater(timeout, d.callback, res)
137 def test_system_this_one_breaks(self):
138 # this uses a single Tub, which has a strong effect on the
141 tub.setOption("logLocalFailures", True)
142 tub.setOption("logRemoteFailures", True)
143 tub.setServiceParent(self.parent)
144 l = tub.listenOn("tcp:0")
145 portnum = l.getPortnum()
146 tub.setLocation("localhost:%d" % portnum)
149 i.setServiceParent(self.parent)
150 iurl = tub.registerReference(i)
155 node_furl = tub.registerReference(n)
156 c = IntroducerClient(tub, iurl, node_furl)
157 c.setServiceParent(self.parent)
163 log.msg("doing _check")
164 self.failUnlessEqual(len(clients[0].connections), 5)
165 d.addCallback(_check)
166 reactor.callLater(2, d.callback, None)
168 del test_system_this_one_breaks
171 def test_system_this_one_breaks_too(self):
172 # this one shuts down so quickly that it fails in a different way
173 self.central_tub = tub = Tub()
174 tub.setOption("logLocalFailures", True)
175 tub.setOption("logRemoteFailures", True)
176 tub.setServiceParent(self.parent)
177 l = tub.listenOn("tcp:0")
178 portnum = l.getPortnum()
179 tub.setLocation("localhost:%d" % portnum)
182 i.setServiceParent(self.parent)
183 iurl = tub.registerReference(i)
188 tub.setOption("logLocalFailures", True)
189 tub.setOption("logRemoteFailures", True)
190 tub.setServiceParent(self.parent)
191 l = tub.listenOn("tcp:0")
192 portnum = l.getPortnum()
193 tub.setLocation("localhost:%d" % portnum)
196 node_furl = tub.registerReference(n)
197 c = IntroducerClient(tub, iurl, node_furl)
198 c.setServiceParent(self.parent)
203 reactor.callLater(0.01, d.callback, None)
205 log.msg("doing _check")
208 self.failUnlessEqual(len(c.connections), 5)
209 c.connections.values()[0].tracker.broker.transport.loseConnection()
210 return self.stall(None, 2)
211 d.addCallback(_check)
212 def _check_again(res):
213 log.msg("doing _check_again")
215 self.failUnlessEqual(len(c.connections), 5)
216 d.addCallback(_check_again)
218 del test_system_this_one_breaks_too