1 from base64 import b32encode
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.python import log
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
13 from allmydata.util import testutil
15 class MyNode(Referenceable):
18 class LoggingMultiService(service.MultiService):
19 def log(self, msg, **kw):
22 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
23 def test_loadable(self):
24 basedir = "introducer.IntroducerNode.test_loadable"
26 q = IntroducerNode(basedir)
27 d = fireEventually(None)
28 d.addCallback(lambda res: q.startService())
29 d.addCallback(lambda res: q.when_tub_ready())
30 def _check_parameters(res):
31 i = q.getServiceNamed("introducer")
32 self.failUnlessEqual(i._encoding_parameters, (3, 7, 10))
33 d.addCallback(_check_parameters)
34 d.addCallback(lambda res: q.stopService())
35 d.addCallback(flushEventualQueue)
38 def test_set_parameters(self):
39 basedir = "introducer.IntroducerNode.test_set_parameters"
41 f = open(os.path.join(basedir, "encoding_parameters"), "w")
44 q = IntroducerNode(basedir)
45 d = fireEventually(None)
46 d.addCallback(lambda res: q.startService())
47 d.addCallback(lambda res: q.when_tub_ready())
48 def _check_parameters(res):
49 i = q.getServiceNamed("introducer")
50 self.failUnlessEqual(i._encoding_parameters, (25, 75, 100))
51 d.addCallback(_check_parameters)
52 d.addCallback(lambda res: q.stopService())
53 d.addCallback(flushEventualQueue)
56 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
58 self.parent = LoggingMultiService()
59 self.parent.startService()
61 log.msg("TestIntroducer.tearDown")
62 d = defer.succeed(None)
63 d.addCallback(lambda res: self.parent.stopService())
64 d.addCallback(flushEventualQueue)
68 def test_create(self):
69 ic = IntroducerClient(None, "introducer", "myfurl")
70 def _ignore(nodeid, rref):
72 ic.notify_on_new_connection(_ignore)
74 def test_listen(self):
75 i = IntroducerService()
76 i.setServiceParent(self.parent)
78 def test_system(self):
80 self.central_tub = tub = Tub()
81 #tub.setOption("logLocalFailures", True)
82 #tub.setOption("logRemoteFailures", True)
83 tub.setServiceParent(self.parent)
84 l = tub.listenOn("tcp:0")
85 portnum = l.getPortnum()
86 tub.setLocation("localhost:%d" % portnum)
88 i = IntroducerService()
89 i.setServiceParent(self.parent)
90 iurl = tub.registerReference(i)
95 for i in range(NUMCLIENTS):
97 #tub.setOption("logLocalFailures", True)
98 #tub.setOption("logRemoteFailures", True)
99 tub.setServiceParent(self.parent)
100 l = tub.listenOn("tcp:0")
101 portnum = l.getPortnum()
102 tub.setLocation("localhost:%d" % portnum)
105 node_furl = tub.registerReference(n)
106 c = IntroducerClient(tub, iurl, node_furl)
108 c.setServiceParent(self.parent)
112 def _wait_for_all_connections(res):
113 dl = [] # list of when_enough_peers() for each peer
114 # will fire once everybody is connected
116 dl.append(c.when_enough_peers(NUMCLIENTS))
117 return defer.DeferredList(dl, fireOnOneErrback=True)
119 d = _wait_for_all_connections(None)
122 log.msg("doing _check1")
124 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
125 self.failUnless(c._connected) # to the introducer
126 d.addCallback(_check1)
127 origin_c = clients[0]
128 def _disconnect_somebody_else(res):
129 # now disconnect somebody's connection to someone else
130 # find a target that is not themselves
131 for nodeid,rref in origin_c.connections.items():
132 if b32encode(nodeid).lower() != tubs[origin_c].tubID:
135 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
136 victim.tracker.broker.transport.loseConnection()
137 log.msg(" did disconnect")
138 d.addCallback(_disconnect_somebody_else)
139 def _wait_til_he_notices(res):
140 # wait til the origin_c notices the loss
141 log.msg(" waiting until peer notices the disconnection")
142 return origin_c.when_fewer_than_peers(NUMCLIENTS)
143 d.addCallback(_wait_til_he_notices)
144 def _wait_for_reconnection(res):
145 log.msg(" doing _wait_for_reconnection()")
146 return origin_c.when_enough_peers(NUMCLIENTS)
147 d.addCallback(_wait_for_reconnection)
149 log.msg("doing _check2")
151 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
152 d.addCallback(_check2)
153 def _disconnect_yourself(res):
154 # now disconnect somebody's connection to themselves.
155 # find a target that *is* themselves
156 for nodeid,rref in origin_c.connections.items():
157 if b32encode(nodeid).lower() == tubs[origin_c].tubID:
160 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
161 victim.tracker.broker.transport.loseConnection()
162 log.msg(" did disconnect from self")
163 d.addCallback(_disconnect_yourself)
164 d.addCallback(_wait_til_he_notices)
165 d.addCallback(_wait_for_all_connections)
167 log.msg("doing _check3")
169 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
170 d.addCallback(_check3)
171 def _shutdown_introducer(res):
172 # now shut down the introducer. We do this by shutting down the
173 # tub it's using. Nobody's connections (to each other) should go
174 # down. All clients should notice the loss, and no other errors
176 log.msg("shutting down the introducer")
177 return self.central_tub.disownServiceParent()
178 d.addCallback(_shutdown_introducer)
179 d.addCallback(self.stall, 2)
181 log.msg("doing _check4")
183 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
184 self.failIf(c._connected)
185 d.addCallback(_check4)
187 test_system.timeout = 2400
189 def stall(self, res, timeout):
191 reactor.callLater(timeout, d.callback, res)
194 def test_system_this_one_breaks(self):
195 # this uses a single Tub, which has a strong effect on the
198 tub.setOption("logLocalFailures", True)
199 tub.setOption("logRemoteFailures", True)
200 tub.setServiceParent(self.parent)
201 l = tub.listenOn("tcp:0")
202 portnum = l.getPortnum()
203 tub.setLocation("localhost:%d" % portnum)
205 i = IntroducerService()
206 i.setServiceParent(self.parent)
207 iurl = tub.registerReference(i)
212 node_furl = tub.registerReference(n)
213 c = IntroducerClient(tub, iurl, node_furl)
214 c.setServiceParent(self.parent)
220 log.msg("doing _check")
221 self.failUnlessEqual(len(clients[0].connections), 5)
222 d.addCallback(_check)
223 reactor.callLater(2, d.callback, None)
225 del test_system_this_one_breaks
228 def test_system_this_one_breaks_too(self):
229 # this one shuts down so quickly that it fails in a different way
230 self.central_tub = tub = Tub()
231 tub.setOption("logLocalFailures", True)
232 tub.setOption("logRemoteFailures", True)
233 tub.setServiceParent(self.parent)
234 l = tub.listenOn("tcp:0")
235 portnum = l.getPortnum()
236 tub.setLocation("localhost:%d" % portnum)
238 i = IntroducerService()
239 i.setServiceParent(self.parent)
240 iurl = tub.registerReference(i)
245 tub.setOption("logLocalFailures", True)
246 tub.setOption("logRemoteFailures", True)
247 tub.setServiceParent(self.parent)
248 l = tub.listenOn("tcp:0")
249 portnum = l.getPortnum()
250 tub.setLocation("localhost:%d" % portnum)
253 node_furl = tub.registerReference(n)
254 c = IntroducerClient(tub, iurl, node_furl)
255 c.setServiceParent(self.parent)
260 reactor.callLater(0.01, d.callback, None)
262 log.msg("doing _check")
265 self.failUnlessEqual(len(c.connections), 5)
266 c.connections.values()[0].tracker.broker.transport.loseConnection()
267 return self.stall(None, 2)
268 d.addCallback(_check)
269 def _check_again(res):
270 log.msg("doing _check_again")
272 self.failUnlessEqual(len(c.connections), 5)
273 d.addCallback(_check_again)
275 del test_system_this_one_breaks_too