2 from twisted.trial import unittest
3 from twisted.internet import defer, reactor
4 from twisted.python import log
6 from foolscap import Tub, Referenceable
7 from foolscap.eventual import flushEventualQueue
8 from twisted.application import service
9 from allmydata.introducer import IntroducerClient, Introducer
10 from allmydata.util import idlib
12 class MyNode(Referenceable):
15 class LoggingMultiService(service.MultiService):
19 class TestIntroducer(unittest.TestCase):
21 self.parent = LoggingMultiService()
22 self.parent.startService()
24 log.msg("TestIntroducer.tearDown")
25 d = defer.succeed(None)
26 d.addCallback(lambda res: self.parent.stopService())
27 d.addCallback(flushEventualQueue)
31 def poll(self, check_f, pollinterval=0.01):
32 # Return a Deferred, then call check_f periodically until it returns
33 # True, at which point the Deferred will fire.. If check_f raises an
34 # exception, the Deferred will errback.
35 d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
38 def _poll(self, res, check_f, pollinterval):
42 d.addCallback(self._poll, check_f, pollinterval)
43 reactor.callLater(pollinterval, d.callback, None)
47 def test_create(self):
48 ic = IntroducerClient(None, "introducer", "myfurl")
49 def _ignore(nodeid, rref):
51 ic.notify_on_new_connection(_ignore)
53 def test_listen(self):
55 i.setServiceParent(self.parent)
57 def test_system(self):
59 self.central_tub = tub = Tub()
60 #tub.setOption("logLocalFailures", True)
61 #tub.setOption("logRemoteFailures", True)
62 tub.setServiceParent(self.parent)
63 l = tub.listenOn("tcp:0")
64 portnum = l.getPortnum()
65 tub.setLocation("localhost:%d" % portnum)
68 i.setServiceParent(self.parent)
69 iurl = tub.registerReference(i)
72 self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
73 d = self._done_counting = defer.Deferred()
74 def _count(nodeid, rref):
75 log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
76 self.waiting_for_connections -= 1
77 if self.waiting_for_connections == 0:
78 self._done_counting.callback("done!")
82 for i in range(NUMCLIENTS):
84 #tub.setOption("logLocalFailures", True)
85 #tub.setOption("logRemoteFailures", True)
86 tub.setServiceParent(self.parent)
87 l = tub.listenOn("tcp:0")
88 portnum = l.getPortnum()
89 tub.setLocation("localhost:%d" % portnum)
92 node_furl = tub.registerReference(n)
93 c = IntroducerClient(tub, iurl, node_furl)
94 c.notify_on_new_connection(_count)
95 c.setServiceParent(self.parent)
99 # d will fire once everybody is connected
102 log.msg("doing _check")
104 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
105 # now disconnect somebody's connection to someone else
106 self.waiting_for_connections = 2
107 d2 = self._done_counting = defer.Deferred()
108 origin_c = clients[0]
109 # find a target that is not themselves
110 for nodeid,rref in origin_c.connections.items():
111 if idlib.b2a(nodeid) != tubs[origin_c].tubID:
114 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
115 victim.tracker.broker.transport.loseConnection()
116 log.msg(" did disconnect")
118 d.addCallback(_check)
119 def _check_again(res):
120 log.msg("doing _check_again")
122 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
123 # now disconnect somebody's connection to themselves. This will
124 # only result in one new connection, since it is a loopback.
125 self.waiting_for_connections = 1
126 d2 = self._done_counting = defer.Deferred()
127 origin_c = clients[0]
128 # find a target that *is* themselves
129 for nodeid,rref in origin_c.connections.items():
130 if idlib.b2a(nodeid) == tubs[origin_c].tubID:
133 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
134 victim.tracker.broker.transport.loseConnection()
135 log.msg(" did disconnect")
137 d.addCallback(_check_again)
138 def _check_again2(res):
139 log.msg("doing _check_again2")
141 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
142 # now disconnect somebody's connection to themselves
143 d.addCallback(_check_again2)
145 test_system.timeout = 2400
147 def stall(self, res, timeout):
149 reactor.callLater(timeout, d.callback, res)
152 def test_system_this_one_breaks(self):
153 # this uses a single Tub, which has a strong effect on the
156 tub.setOption("logLocalFailures", True)
157 tub.setOption("logRemoteFailures", True)
158 tub.setServiceParent(self.parent)
159 l = tub.listenOn("tcp:0")
160 portnum = l.getPortnum()
161 tub.setLocation("localhost:%d" % portnum)
164 i.setServiceParent(self.parent)
165 iurl = tub.registerReference(i)
170 node_furl = tub.registerReference(n)
171 c = IntroducerClient(tub, iurl, node_furl)
172 c.setServiceParent(self.parent)
178 log.msg("doing _check")
179 self.failUnlessEqual(len(clients[0].connections), 5)
180 d.addCallback(_check)
181 reactor.callLater(2, d.callback, None)
183 del test_system_this_one_breaks
186 def test_system_this_one_breaks_too(self):
187 # this one shuts down so quickly that it fails in a different way
188 self.central_tub = tub = Tub()
189 tub.setOption("logLocalFailures", True)
190 tub.setOption("logRemoteFailures", True)
191 tub.setServiceParent(self.parent)
192 l = tub.listenOn("tcp:0")
193 portnum = l.getPortnum()
194 tub.setLocation("localhost:%d" % portnum)
197 i.setServiceParent(self.parent)
198 iurl = tub.registerReference(i)
203 tub.setOption("logLocalFailures", True)
204 tub.setOption("logRemoteFailures", True)
205 tub.setServiceParent(self.parent)
206 l = tub.listenOn("tcp:0")
207 portnum = l.getPortnum()
208 tub.setLocation("localhost:%d" % portnum)
211 node_furl = tub.registerReference(n)
212 c = IntroducerClient(tub, iurl, node_furl)
213 c.setServiceParent(self.parent)
218 reactor.callLater(0.01, d.callback, None)
220 log.msg("doing _check")
223 self.failUnlessEqual(len(c.connections), 5)
224 c.connections.values()[0].tracker.broker.transport.loseConnection()
225 return self.stall(None, 2)
226 d.addCallback(_check)
227 def _check_again(res):
228 log.msg("doing _check_again")
230 self.failUnlessEqual(len(c.connections), 5)
231 d.addCallback(_check_again)
233 del test_system_this_one_breaks_too