1 from base64 import b32encode
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.python import log
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
13 from allmydata.util import testutil
15 class FakeNode(Referenceable):
18 class LoggingMultiService(service.MultiService):
19 def log(self, msg, **kw):
22 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
23 def test_loadable(self):
24 basedir = "introducer.IntroducerNode.test_loadable"
26 q = IntroducerNode(basedir)
27 d = fireEventually(None)
28 d.addCallback(lambda res: q.startService())
29 d.addCallback(lambda res: q.when_tub_ready())
30 def _check_parameters(res):
31 i = q.getServiceNamed("introducer")
32 self.failUnlessEqual(i._encoding_parameters, (3, 7, 10))
33 d.addCallback(_check_parameters)
34 d.addCallback(lambda res: q.stopService())
35 d.addCallback(flushEventualQueue)
38 def test_set_parameters(self):
39 basedir = "introducer.IntroducerNode.test_set_parameters"
41 f = open(os.path.join(basedir, "encoding_parameters"), "w")
44 q = IntroducerNode(basedir)
45 d = fireEventually(None)
46 d.addCallback(lambda res: q.startService())
47 d.addCallback(lambda res: q.when_tub_ready())
48 def _check_parameters(res):
49 i = q.getServiceNamed("introducer")
50 self.failUnlessEqual(i._encoding_parameters, (25, 75, 100))
51 d.addCallback(_check_parameters)
52 d.addCallback(lambda res: q.stopService())
53 d.addCallback(flushEventualQueue)
56 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
58 self.parent = LoggingMultiService()
59 self.parent.startService()
61 log.msg("TestIntroducer.tearDown")
62 d = defer.succeed(None)
63 d.addCallback(lambda res: self.parent.stopService())
64 d.addCallback(flushEventualQueue)
68 def test_create(self):
69 ic = IntroducerClient(None, "introducer", "myfurl")
70 def _ignore(nodeid, rref):
72 ic.notify_on_new_connection(_ignore)
74 def test_listen(self):
75 i = IntroducerService()
76 i.setServiceParent(self.parent)
78 def test_system(self):
80 self.central_tub = tub = Tub()
81 #tub.setOption("logLocalFailures", True)
82 #tub.setOption("logRemoteFailures", True)
83 tub.setServiceParent(self.parent)
84 l = tub.listenOn("tcp:0")
85 portnum = l.getPortnum()
86 tub.setLocation("localhost:%d" % portnum)
88 i = IntroducerService()
89 i.setServiceParent(self.parent)
90 iurl = tub.registerReference(i)
95 for i in range(NUMCLIENTS):
97 #tub.setOption("logLocalFailures", True)
98 #tub.setOption("logRemoteFailures", True)
99 tub.setServiceParent(self.parent)
100 l = tub.listenOn("tcp:0")
101 portnum = l.getPortnum()
102 tub.setLocation("localhost:%d" % portnum)
105 node_furl = tub.registerReference(n)
106 c = IntroducerClient(tub, iurl, node_furl)
108 c.setServiceParent(self.parent)
112 def _wait_for_all_connections(res):
113 dl = [] # list of when_enough_peers() for each peer
114 # will fire once everybody is connected
116 dl.append(c.when_enough_peers(NUMCLIENTS))
117 return defer.DeferredList(dl, fireOnOneErrback=True)
119 d = _wait_for_all_connections(None)
122 log.msg("doing _check1")
124 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
125 self.failUnless(c._connected) # to the introducer
126 d.addCallback(_check1)
127 origin_c = clients[0]
128 def _disconnect_somebody_else(res):
129 # now disconnect somebody's connection to someone else
130 # find a target that is not themselves
131 for nodeid,rref in origin_c.connections.items():
132 if b32encode(nodeid).lower() != tubs[origin_c].tubID:
135 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
136 victim.tracker.broker.transport.loseConnection()
137 log.msg(" did disconnect")
138 d.addCallback(_disconnect_somebody_else)
139 def _wait_til_he_notices(res):
140 # wait til the origin_c notices the loss
141 log.msg(" waiting until peer notices the disconnection")
142 return origin_c.when_fewer_than_peers(NUMCLIENTS)
143 d.addCallback(_wait_til_he_notices)
144 d.addCallback(_wait_for_all_connections)
146 log.msg("doing _check2")
148 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
149 d.addCallback(_check2)
150 def _disconnect_yourself(res):
151 # now disconnect somebody's connection to themselves.
152 # find a target that *is* themselves
153 for nodeid,rref in origin_c.connections.items():
154 if b32encode(nodeid).lower() == tubs[origin_c].tubID:
157 log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
158 victim.tracker.broker.transport.loseConnection()
159 log.msg(" did disconnect from self")
160 d.addCallback(_disconnect_yourself)
161 d.addCallback(_wait_til_he_notices)
162 d.addCallback(_wait_for_all_connections)
164 log.msg("doing _check3")
166 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
167 d.addCallback(_check3)
168 def _shutdown_introducer(res):
169 # now shut down the introducer. We do this by shutting down the
170 # tub it's using. Nobody's connections (to each other) should go
171 # down. All clients should notice the loss, and no other errors
173 log.msg("shutting down the introducer")
174 return self.central_tub.disownServiceParent()
175 d.addCallback(_shutdown_introducer)
176 d.addCallback(self.stall, 2)
178 log.msg("doing _check4")
180 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
181 self.failIf(c._connected)
182 d.addCallback(_check4)
184 test_system.timeout = 2400
186 def stall(self, res, timeout):
188 reactor.callLater(timeout, d.callback, res)
191 def test_system_this_one_breaks(self):
192 # this uses a single Tub, which has a strong effect on the
195 tub.setOption("logLocalFailures", True)
196 tub.setOption("logRemoteFailures", True)
197 tub.setServiceParent(self.parent)
198 l = tub.listenOn("tcp:0")
199 portnum = l.getPortnum()
200 tub.setLocation("localhost:%d" % portnum)
202 i = IntroducerService()
203 i.setServiceParent(self.parent)
204 iurl = tub.registerReference(i)
209 node_furl = tub.registerReference(n)
210 c = IntroducerClient(tub, iurl, node_furl)
211 c.setServiceParent(self.parent)
217 log.msg("doing _check")
218 self.failUnlessEqual(len(clients[0].connections), 5)
219 d.addCallback(_check)
220 reactor.callLater(2, d.callback, None)
222 del test_system_this_one_breaks
225 def test_system_this_one_breaks_too(self):
226 # this one shuts down so quickly that it fails in a different way
227 self.central_tub = tub = Tub()
228 tub.setOption("logLocalFailures", True)
229 tub.setOption("logRemoteFailures", True)
230 tub.setServiceParent(self.parent)
231 l = tub.listenOn("tcp:0")
232 portnum = l.getPortnum()
233 tub.setLocation("localhost:%d" % portnum)
235 i = IntroducerService()
236 i.setServiceParent(self.parent)
237 iurl = tub.registerReference(i)
242 tub.setOption("logLocalFailures", True)
243 tub.setOption("logRemoteFailures", True)
244 tub.setServiceParent(self.parent)
245 l = tub.listenOn("tcp:0")
246 portnum = l.getPortnum()
247 tub.setLocation("localhost:%d" % portnum)
250 node_furl = tub.registerReference(n)
251 c = IntroducerClient(tub, iurl, node_furl)
252 c.setServiceParent(self.parent)
257 reactor.callLater(0.01, d.callback, None)
259 log.msg("doing _check")
262 self.failUnlessEqual(len(c.connections), 5)
263 c.connections.values()[0].tracker.broker.transport.loseConnection()
264 return self.stall(None, 2)
265 d.addCallback(_check)
266 def _check_again(res):
267 log.msg("doing _check_again")
269 self.failUnlessEqual(len(c.connections), 5)
270 d.addCallback(_check_again)
272 del test_system_this_one_breaks_too