1 from base64 import b32encode
3 from twisted.trial import unittest
4 from twisted.internet import defer, reactor
5 from twisted.python import log
7 from foolscap import Tub, Referenceable
8 from foolscap.eventual import flushEventualQueue
9 from twisted.application import service
10 from allmydata.introducer import IntroducerClient, Introducer
11 from allmydata.util import testutil
13 class MyNode(Referenceable):
16 class LoggingMultiService(service.MultiService):
20 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
22 self.parent = LoggingMultiService()
23 self.parent.startService()
25 log.msg("TestIntroducer.tearDown")
26 d = defer.succeed(None)
27 d.addCallback(lambda res: self.parent.stopService())
28 d.addCallback(flushEventualQueue)
32 def test_create(self):
33 ic = IntroducerClient(None, "introducer", "myfurl")
34 def _ignore(nodeid, rref):
36 ic.notify_on_new_connection(_ignore)
38 def test_listen(self):
40 i.setServiceParent(self.parent)
42 def test_system(self):
44 self.central_tub = tub = Tub()
45 #tub.setOption("logLocalFailures", True)
46 #tub.setOption("logRemoteFailures", True)
47 tub.setServiceParent(self.parent)
48 l = tub.listenOn("tcp:0")
49 portnum = l.getPortnum()
50 tub.setLocation("localhost:%d" % portnum)
53 i.setServiceParent(self.parent)
54 iurl = tub.registerReference(i)
57 self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
58 d = self._done_counting = defer.Deferred()
59 def _count(nodeid, rref):
60 log.msg("NEW CONNECTION! %s %s" % (b32encode(nodeid).lower(), rref))
61 self.waiting_for_connections -= 1
62 if self.waiting_for_connections == 0:
63 self._done_counting.callback("done!")
67 for i in range(NUMCLIENTS):
69 #tub.setOption("logLocalFailures", True)
70 #tub.setOption("logRemoteFailures", True)
71 tub.setServiceParent(self.parent)
72 l = tub.listenOn("tcp:0")
73 portnum = l.getPortnum()
74 tub.setLocation("localhost:%d" % portnum)
77 node_furl = tub.registerReference(n)
78 c = IntroducerClient(tub, iurl, node_furl)
79 c.notify_on_new_connection(_count)
80 c.setServiceParent(self.parent)
84 # d will fire once everybody is connected
87 log.msg("doing _check1")
89 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
90 self.failUnless(c._connected) # to the introducer
91 d.addCallback(_check1)
92 def _disconnect_somebody_else(res):
93 # now disconnect somebody's connection to someone else
94 self.waiting_for_connections = 2
95 d2 = self._done_counting = defer.Deferred()
97 # find a target that is not themselves
98 for nodeid,rref in origin_c.connections.items():
99 if b32encode(nodeid).lower() != tubs[origin_c].tubID:
102 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
103 victim.tracker.broker.transport.loseConnection()
104 log.msg(" did disconnect")
106 d.addCallback(_disconnect_somebody_else)
108 log.msg("doing _check2")
110 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
111 d.addCallback(_check2)
112 def _disconnect_yourself(res):
113 # now disconnect somebody's connection to themselves. This will
114 # only result in one new connection, since it is a loopback.
115 self.waiting_for_connections = 1
116 d2 = self._done_counting = defer.Deferred()
117 origin_c = clients[0]
118 # find a target that *is* themselves
119 for nodeid,rref in origin_c.connections.items():
120 if b32encode(nodeid).lower() == tubs[origin_c].tubID:
123 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
124 victim.tracker.broker.transport.loseConnection()
125 log.msg(" did disconnect")
127 d.addCallback(_disconnect_yourself)
129 log.msg("doing _check3")
131 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
132 d.addCallback(_check3)
133 def _shutdown_introducer(res):
134 # now shut down the introducer. We do this by shutting down the
135 # tub it's using. Nobody's connections (to each other) should go
136 # down. All clients should notice the loss, and no other errors
138 log.msg("shutting down the introducer")
139 return self.central_tub.disownServiceParent()
140 d.addCallback(_shutdown_introducer)
141 d.addCallback(self.stall, 2)
143 log.msg("doing _check4")
145 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
146 self.failIf(c._connected)
147 d.addCallback(_check4)
149 test_system.timeout = 2400
151 def stall(self, res, timeout):
153 reactor.callLater(timeout, d.callback, res)
156 def test_system_this_one_breaks(self):
157 # this uses a single Tub, which has a strong effect on the
160 tub.setOption("logLocalFailures", True)
161 tub.setOption("logRemoteFailures", True)
162 tub.setServiceParent(self.parent)
163 l = tub.listenOn("tcp:0")
164 portnum = l.getPortnum()
165 tub.setLocation("localhost:%d" % portnum)
168 i.setServiceParent(self.parent)
169 iurl = tub.registerReference(i)
174 node_furl = tub.registerReference(n)
175 c = IntroducerClient(tub, iurl, node_furl)
176 c.setServiceParent(self.parent)
182 log.msg("doing _check")
183 self.failUnlessEqual(len(clients[0].connections), 5)
184 d.addCallback(_check)
185 reactor.callLater(2, d.callback, None)
187 del test_system_this_one_breaks
190 def test_system_this_one_breaks_too(self):
191 # this one shuts down so quickly that it fails in a different way
192 self.central_tub = tub = Tub()
193 tub.setOption("logLocalFailures", True)
194 tub.setOption("logRemoteFailures", True)
195 tub.setServiceParent(self.parent)
196 l = tub.listenOn("tcp:0")
197 portnum = l.getPortnum()
198 tub.setLocation("localhost:%d" % portnum)
201 i.setServiceParent(self.parent)
202 iurl = tub.registerReference(i)
207 tub.setOption("logLocalFailures", True)
208 tub.setOption("logRemoteFailures", True)
209 tub.setServiceParent(self.parent)
210 l = tub.listenOn("tcp:0")
211 portnum = l.getPortnum()
212 tub.setLocation("localhost:%d" % portnum)
215 node_furl = tub.registerReference(n)
216 c = IntroducerClient(tub, iurl, node_furl)
217 c.setServiceParent(self.parent)
222 reactor.callLater(0.01, d.callback, None)
224 log.msg("doing _check")
227 self.failUnlessEqual(len(c.connections), 5)
228 c.connections.values()[0].tracker.broker.transport.loseConnection()
229 return self.stall(None, 2)
230 d.addCallback(_check)
231 def _check_again(res):
232 log.msg("doing _check_again")
234 self.failUnlessEqual(len(c.connections), 5)
235 d.addCallback(_check_again)
237 del test_system_this_one_breaks_too