1 from base64 import b32encode
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.python import log
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
13 from allmydata.util import testutil
15 class MyNode(Referenceable):
18 class LoggingMultiService(service.MultiService):
19 def log(self, msg, **kw):
22 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
23 def test_loadable(self):
24 basedir = "introducer.IntroducerNode.test_loadable"
26 q = IntroducerNode(basedir)
27 d = fireEventually(None)
28 d.addCallback(lambda res: q.startService())
29 d.addCallback(lambda res: q.when_tub_ready())
30 def _check_parameters(res):
31 i = q.getServiceNamed("introducer")
32 self.failUnlessEqual(i._encoding_parameters, (3, 7, 10))
33 d.addCallback(_check_parameters)
34 d.addCallback(lambda res: q.stopService())
35 d.addCallback(flushEventualQueue)
38 def test_set_parameters(self):
39 basedir = "introducer.IntroducerNode.test_set_parameters"
41 f = open(os.path.join(basedir, "encoding_parameters"), "w")
44 q = IntroducerNode(basedir)
45 d = fireEventually(None)
46 d.addCallback(lambda res: q.startService())
47 d.addCallback(lambda res: q.when_tub_ready())
48 def _check_parameters(res):
49 i = q.getServiceNamed("introducer")
50 self.failUnlessEqual(i._encoding_parameters, (25, 75, 100))
51 d.addCallback(_check_parameters)
52 d.addCallback(lambda res: q.stopService())
53 d.addCallback(flushEventualQueue)
56 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
58 self.parent = LoggingMultiService()
59 self.parent.startService()
61 log.msg("TestIntroducer.tearDown")
62 d = defer.succeed(None)
63 d.addCallback(lambda res: self.parent.stopService())
64 d.addCallback(flushEventualQueue)
68 def test_create(self):
69 ic = IntroducerClient(None, "introducer", "myfurl")
70 def _ignore(nodeid, rref):
72 ic.notify_on_new_connection(_ignore)
74 def test_listen(self):
75 i = IntroducerService()
76 i.setServiceParent(self.parent)
78 def test_system(self):
80 self.central_tub = tub = Tub()
81 #tub.setOption("logLocalFailures", True)
82 #tub.setOption("logRemoteFailures", True)
83 tub.setServiceParent(self.parent)
84 l = tub.listenOn("tcp:0")
85 portnum = l.getPortnum()
86 tub.setLocation("localhost:%d" % portnum)
88 i = IntroducerService()
89 i.setServiceParent(self.parent)
90 iurl = tub.registerReference(i)
93 self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
94 d = self._done_counting = defer.Deferred()
95 def _count(nodeid, rref):
96 log.msg("NEW CONNECTION! %s %s" % (b32encode(nodeid).lower(), rref))
97 self.waiting_for_connections -= 1
98 if self.waiting_for_connections == 0:
99 self._done_counting.callback("done!")
103 for i in range(NUMCLIENTS):
105 #tub.setOption("logLocalFailures", True)
106 #tub.setOption("logRemoteFailures", True)
107 tub.setServiceParent(self.parent)
108 l = tub.listenOn("tcp:0")
109 portnum = l.getPortnum()
110 tub.setLocation("localhost:%d" % portnum)
113 node_furl = tub.registerReference(n)
114 c = IntroducerClient(tub, iurl, node_furl)
115 c.notify_on_new_connection(_count)
116 c.setServiceParent(self.parent)
120 # d will fire once everybody is connected
123 log.msg("doing _check1")
125 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
126 self.failUnless(c._connected) # to the introducer
127 d.addCallback(_check1)
128 def _disconnect_somebody_else(res):
129 # now disconnect somebody's connection to someone else
130 self.waiting_for_connections = 2
131 d2 = self._done_counting = defer.Deferred()
132 origin_c = clients[0]
133 # find a target that is not themselves
134 for nodeid,rref in origin_c.connections.items():
135 if b32encode(nodeid).lower() != tubs[origin_c].tubID:
138 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
139 victim.tracker.broker.transport.loseConnection()
140 log.msg(" did disconnect")
142 d.addCallback(_disconnect_somebody_else)
144 log.msg("doing _check2")
146 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
147 d.addCallback(_check2)
148 def _disconnect_yourself(res):
149 # now disconnect somebody's connection to themselves. This will
150 # only result in one new connection, since it is a loopback.
151 self.waiting_for_connections = 1
152 d2 = self._done_counting = defer.Deferred()
153 origin_c = clients[0]
154 # find a target that *is* themselves
155 for nodeid,rref in origin_c.connections.items():
156 if b32encode(nodeid).lower() == tubs[origin_c].tubID:
159 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
160 victim.tracker.broker.transport.loseConnection()
161 log.msg(" did disconnect")
163 d.addCallback(_disconnect_yourself)
165 log.msg("doing _check3")
167 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
168 d.addCallback(_check3)
169 def _shutdown_introducer(res):
170 # now shut down the introducer. We do this by shutting down the
171 # tub it's using. Nobody's connections (to each other) should go
172 # down. All clients should notice the loss, and no other errors
174 log.msg("shutting down the introducer")
175 return self.central_tub.disownServiceParent()
176 d.addCallback(_shutdown_introducer)
177 d.addCallback(self.stall, 2)
179 log.msg("doing _check4")
181 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
182 self.failIf(c._connected)
183 d.addCallback(_check4)
185 test_system.timeout = 2400
187 def stall(self, res, timeout):
189 reactor.callLater(timeout, d.callback, res)
192 def test_system_this_one_breaks(self):
193 # this uses a single Tub, which has a strong effect on the
196 tub.setOption("logLocalFailures", True)
197 tub.setOption("logRemoteFailures", True)
198 tub.setServiceParent(self.parent)
199 l = tub.listenOn("tcp:0")
200 portnum = l.getPortnum()
201 tub.setLocation("localhost:%d" % portnum)
203 i = IntroducerService()
204 i.setServiceParent(self.parent)
205 iurl = tub.registerReference(i)
210 node_furl = tub.registerReference(n)
211 c = IntroducerClient(tub, iurl, node_furl)
212 c.setServiceParent(self.parent)
218 log.msg("doing _check")
219 self.failUnlessEqual(len(clients[0].connections), 5)
220 d.addCallback(_check)
221 reactor.callLater(2, d.callback, None)
223 del test_system_this_one_breaks
226 def test_system_this_one_breaks_too(self):
227 # this one shuts down so quickly that it fails in a different way
228 self.central_tub = tub = Tub()
229 tub.setOption("logLocalFailures", True)
230 tub.setOption("logRemoteFailures", True)
231 tub.setServiceParent(self.parent)
232 l = tub.listenOn("tcp:0")
233 portnum = l.getPortnum()
234 tub.setLocation("localhost:%d" % portnum)
236 i = IntroducerService()
237 i.setServiceParent(self.parent)
238 iurl = tub.registerReference(i)
243 tub.setOption("logLocalFailures", True)
244 tub.setOption("logRemoteFailures", True)
245 tub.setServiceParent(self.parent)
246 l = tub.listenOn("tcp:0")
247 portnum = l.getPortnum()
248 tub.setLocation("localhost:%d" % portnum)
251 node_furl = tub.registerReference(n)
252 c = IntroducerClient(tub, iurl, node_furl)
253 c.setServiceParent(self.parent)
258 reactor.callLater(0.01, d.callback, None)
260 log.msg("doing _check")
263 self.failUnlessEqual(len(c.connections), 5)
264 c.connections.values()[0].tracker.broker.transport.loseConnection()
265 return self.stall(None, 2)
266 d.addCallback(_check)
267 def _check_again(res):
268 log.msg("doing _check_again")
270 self.failUnlessEqual(len(c.connections), 5)
271 d.addCallback(_check_again)
273 del test_system_this_one_breaks_too