2 from twisted.trial import unittest
3 from twisted.internet import defer, reactor
4 from twisted.python import log
5 defer.setDebugging(True)
7 from foolscap import Tub, Referenceable
8 from foolscap.eventual import flushEventualQueue
9 from twisted.application import service
10 from allmydata.introducer import IntroducerClient, Introducer
11 from allmydata.util import idlib
13 class MyNode(Referenceable):
16 class LoggingMultiService(service.MultiService):
20 class TestIntroducer(unittest.TestCase):
22 self.parent = LoggingMultiService()
23 self.parent.startService()
25 log.msg("TestIntroducer.tearDown")
26 d = defer.succeed(None)
27 d.addCallback(lambda res: self.parent.stopService())
28 d.addCallback(flushEventualQueue)
32 def poll(self, check_f, pollinterval=0.01):
33 # Return a Deferred, then call check_f periodically until it returns
34 # True, at which point the Deferred will fire.. If check_f raises an
35 # exception, the Deferred will errback.
36 d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
39 def _poll(self, res, check_f, pollinterval):
43 d.addCallback(self._poll, check_f, pollinterval)
44 reactor.callLater(pollinterval, d.callback, None)
48 def test_create(self):
49 ic = IntroducerClient(None, "introducer", "mypburl")
50 def _ignore(nodeid, rref):
52 ic.notify_on_new_connection(_ignore)
54 def test_listen(self):
56 i.setServiceParent(self.parent)
58 def test_system(self):
60 self.central_tub = tub = Tub()
61 #tub.setOption("logLocalFailures", True)
62 #tub.setOption("logRemoteFailures", True)
63 tub.setServiceParent(self.parent)
64 l = tub.listenOn("tcp:0")
65 portnum = l.getPortnum()
66 tub.setLocation("localhost:%d" % portnum)
69 i.setServiceParent(self.parent)
70 iurl = tub.registerReference(i)
73 self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
74 d = self._done_counting = defer.Deferred()
75 def _count(nodeid, rref):
76 log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
77 self.waiting_for_connections -= 1
78 if self.waiting_for_connections == 0:
79 self._done_counting.callback("done!")
83 for i in range(NUMCLIENTS):
85 #tub.setOption("logLocalFailures", True)
86 #tub.setOption("logRemoteFailures", True)
87 tub.setServiceParent(self.parent)
88 l = tub.listenOn("tcp:0")
89 portnum = l.getPortnum()
90 tub.setLocation("localhost:%d" % portnum)
93 node_pburl = tub.registerReference(n)
94 c = IntroducerClient(tub, iurl, node_pburl)
95 c.notify_on_new_connection(_count)
96 c.setServiceParent(self.parent)
100 # d will fire once everybody is connected
103 log.msg("doing _check")
105 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
106 # now disconnect somebody's connection to someone else
107 self.waiting_for_connections = 2
108 d2 = self._done_counting = defer.Deferred()
109 origin_c = clients[0]
110 # find a target that is not themselves
111 for nodeid,rref in origin_c.connections.items():
112 if idlib.b2a(nodeid) != tubs[origin_c].tubID:
115 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
116 victim.tracker.broker.transport.loseConnection()
117 log.msg(" did disconnect")
119 d.addCallback(_check)
120 def _check_again(res):
121 log.msg("doing _check_again")
123 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
124 # now disconnect somebody's connection to themselves. This will
125 # only result in one new connection, since it is a loopback.
126 self.waiting_for_connections = 1
127 d2 = self._done_counting = defer.Deferred()
128 origin_c = clients[0]
129 # find a target that *is* themselves
130 for nodeid,rref in origin_c.connections.items():
131 if idlib.b2a(nodeid) == tubs[origin_c].tubID:
134 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
135 victim.tracker.broker.transport.loseConnection()
136 log.msg(" did disconnect")
138 d.addCallback(_check_again)
139 def _check_again2(res):
140 log.msg("doing _check_again2")
142 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
143 # now disconnect somebody's connection to themselves
144 d.addCallback(_check_again2)
146 test_system.timeout = 1200
148 def stall(self, res, timeout):
150 reactor.callLater(timeout, d.callback, res)
153 def test_system_this_one_breaks(self):
154 # this uses a single Tub, which has a strong effect on the
157 tub.setOption("logLocalFailures", True)
158 tub.setOption("logRemoteFailures", True)
159 tub.setServiceParent(self.parent)
160 l = tub.listenOn("tcp:0")
161 portnum = l.getPortnum()
162 tub.setLocation("localhost:%d" % portnum)
165 i.setServiceParent(self.parent)
166 iurl = tub.registerReference(i)
171 node_pburl = tub.registerReference(n)
172 c = IntroducerClient(tub, iurl, node_pburl)
173 c.setServiceParent(self.parent)
179 log.msg("doing _check")
180 self.failUnlessEqual(len(clients[0].connections), 5)
181 d.addCallback(_check)
182 reactor.callLater(2, d.callback, None)
184 del test_system_this_one_breaks
187 def test_system_this_one_breaks_too(self):
188 # this one shuts down so quickly that it fails in a different way
189 self.central_tub = tub = Tub()
190 tub.setOption("logLocalFailures", True)
191 tub.setOption("logRemoteFailures", True)
192 tub.setServiceParent(self.parent)
193 l = tub.listenOn("tcp:0")
194 portnum = l.getPortnum()
195 tub.setLocation("localhost:%d" % portnum)
198 i.setServiceParent(self.parent)
199 iurl = tub.registerReference(i)
204 tub.setOption("logLocalFailures", True)
205 tub.setOption("logRemoteFailures", True)
206 tub.setServiceParent(self.parent)
207 l = tub.listenOn("tcp:0")
208 portnum = l.getPortnum()
209 tub.setLocation("localhost:%d" % portnum)
212 node_pburl = tub.registerReference(n)
213 c = IntroducerClient(tub, iurl, node_pburl)
214 c.setServiceParent(self.parent)
219 reactor.callLater(0.01, d.callback, None)
221 log.msg("doing _check")
224 self.failUnlessEqual(len(c.connections), 5)
225 c.connections.values()[0].tracker.broker.transport.loseConnection()
226 return self.stall(None, 2)
227 d.addCallback(_check)
228 def _check_again(res):
229 log.msg("doing _check_again")
231 self.failUnlessEqual(len(c.connections), 5)
232 d.addCallback(_check_again)
234 del test_system_this_one_breaks_too