]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
fix IntroducerClient.when_enough_peers()
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32encode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.python import log
8
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
13 from allmydata.util import testutil
14
15 class MyNode(Referenceable):
16     pass
17
18 class LoggingMultiService(service.MultiService):
19     def log(self, msg, **kw):
20         log.msg(msg, **kw)
21
22 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
23     def test_loadable(self):
24         basedir = "introducer.IntroducerNode.test_loadable"
25         os.mkdir(basedir)
26         q = IntroducerNode(basedir)
27         d = fireEventually(None)
28         d.addCallback(lambda res: q.startService())
29         d.addCallback(lambda res: q.when_tub_ready())
30         def _check_parameters(res):
31             i = q.getServiceNamed("introducer")
32             self.failUnlessEqual(i._encoding_parameters, (3, 7, 10))
33         d.addCallback(_check_parameters)
34         d.addCallback(lambda res: q.stopService())
35         d.addCallback(flushEventualQueue)
36         return d
37
38     def test_set_parameters(self):
39         basedir = "introducer.IntroducerNode.test_set_parameters"
40         os.mkdir(basedir)
41         f = open(os.path.join(basedir, "encoding_parameters"), "w")
42         f.write("25 75 100")
43         f.close()
44         q = IntroducerNode(basedir)
45         d = fireEventually(None)
46         d.addCallback(lambda res: q.startService())
47         d.addCallback(lambda res: q.when_tub_ready())
48         def _check_parameters(res):
49             i = q.getServiceNamed("introducer")
50             self.failUnlessEqual(i._encoding_parameters, (25, 75, 100))
51         d.addCallback(_check_parameters)
52         d.addCallback(lambda res: q.stopService())
53         d.addCallback(flushEventualQueue)
54         return d
55
56 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
57     def setUp(self):
58         self.parent = LoggingMultiService()
59         self.parent.startService()
60     def tearDown(self):
61         log.msg("TestIntroducer.tearDown")
62         d = defer.succeed(None)
63         d.addCallback(lambda res: self.parent.stopService())
64         d.addCallback(flushEventualQueue)
65         return d
66
67
68     def test_create(self):
69         ic = IntroducerClient(None, "introducer", "myfurl")
70         def _ignore(nodeid, rref):
71             pass
72         ic.notify_on_new_connection(_ignore)
73
74     def test_listen(self):
75         i = IntroducerService()
76         i.setServiceParent(self.parent)
77
78     def test_system(self):
79
80         self.central_tub = tub = Tub()
81         #tub.setOption("logLocalFailures", True)
82         #tub.setOption("logRemoteFailures", True)
83         tub.setServiceParent(self.parent)
84         l = tub.listenOn("tcp:0")
85         portnum = l.getPortnum()
86         tub.setLocation("localhost:%d" % portnum)
87
88         i = IntroducerService()
89         i.setServiceParent(self.parent)
90         iurl = tub.registerReference(i)
91         NUMCLIENTS = 5
92
93         clients = []
94         tubs = {}
95         for i in range(NUMCLIENTS):
96             tub = Tub()
97             #tub.setOption("logLocalFailures", True)
98             #tub.setOption("logRemoteFailures", True)
99             tub.setServiceParent(self.parent)
100             l = tub.listenOn("tcp:0")
101             portnum = l.getPortnum()
102             tub.setLocation("localhost:%d" % portnum)
103
104             n = MyNode()
105             node_furl = tub.registerReference(n)
106             c = IntroducerClient(tub, iurl, node_furl)
107
108             c.setServiceParent(self.parent)
109             clients.append(c)
110             tubs[c] = tub
111
112         def _wait_for_all_connections(res):
113             dl = [] # list of when_enough_peers() for each peer
114             # will fire once everybody is connected
115             for c in clients:
116                 dl.append(c.when_enough_peers(NUMCLIENTS))
117             return defer.DeferredList(dl, fireOnOneErrback=True)
118
119         d = _wait_for_all_connections(None)
120
121         def _check1(res):
122             log.msg("doing _check1")
123             for c in clients:
124                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
125                 self.failUnless(c._connected) # to the introducer
126         d.addCallback(_check1)
127         origin_c = clients[0]
128         def _disconnect_somebody_else(res):
129             # now disconnect somebody's connection to someone else
130             # find a target that is not themselves
131             for nodeid,rref in origin_c.connections.items():
132                 if b32encode(nodeid).lower() != tubs[origin_c].tubID:
133                     victim = rref
134                     break
135             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
136             victim.tracker.broker.transport.loseConnection()
137             log.msg(" did disconnect")
138         d.addCallback(_disconnect_somebody_else)
139         def _wait_til_he_notices(res):
140             # wait til the origin_c notices the loss
141             log.msg(" waiting until peer notices the disconnection")
142             return origin_c.when_fewer_than_peers(NUMCLIENTS)
143         d.addCallback(_wait_til_he_notices)
144         def _wait_for_reconnection(res):
145             log.msg(" doing _wait_for_reconnection()")
146             return origin_c.when_enough_peers(NUMCLIENTS)
147         d.addCallback(_wait_for_reconnection)
148         def _check2(res):
149             log.msg("doing _check2")
150             for c in clients:
151                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
152         d.addCallback(_check2)
153         def _disconnect_yourself(res):
154             # now disconnect somebody's connection to themselves.
155             # find a target that *is* themselves
156             for nodeid,rref in origin_c.connections.items():
157                 if b32encode(nodeid).lower() == tubs[origin_c].tubID:
158                     victim = rref
159                     break
160             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
161             victim.tracker.broker.transport.loseConnection()
162             log.msg(" did disconnect from self")
163         d.addCallback(_disconnect_yourself)
164         d.addCallback(_wait_til_he_notices)
165         d.addCallback(_wait_for_all_connections)
166         def _check3(res):
167             log.msg("doing _check3")
168             for c in clients:
169                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
170         d.addCallback(_check3)
171         def _shutdown_introducer(res):
172             # now shut down the introducer. We do this by shutting down the
173             # tub it's using. Nobody's connections (to each other) should go
174             # down. All clients should notice the loss, and no other errors
175             # should occur.
176             log.msg("shutting down the introducer")
177             return self.central_tub.disownServiceParent()
178         d.addCallback(_shutdown_introducer)
179         d.addCallback(self.stall, 2)
180         def _check4(res):
181             log.msg("doing _check4")
182             for c in clients:
183                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
184                 self.failIf(c._connected)
185         d.addCallback(_check4)
186         return d
187     test_system.timeout = 2400
188
189     def stall(self, res, timeout):
190         d = defer.Deferred()
191         reactor.callLater(timeout, d.callback, res)
192         return d
193
194     def test_system_this_one_breaks(self):
195         # this uses a single Tub, which has a strong effect on the
196         # failingness
197         tub = Tub()
198         tub.setOption("logLocalFailures", True)
199         tub.setOption("logRemoteFailures", True)
200         tub.setServiceParent(self.parent)
201         l = tub.listenOn("tcp:0")
202         portnum = l.getPortnum()
203         tub.setLocation("localhost:%d" % portnum)
204
205         i = IntroducerService()
206         i.setServiceParent(self.parent)
207         iurl = tub.registerReference(i)
208
209         clients = []
210         for i in range(5):
211             n = MyNode()
212             node_furl = tub.registerReference(n)
213             c = IntroducerClient(tub, iurl, node_furl)
214             c.setServiceParent(self.parent)
215             clients.append(c)
216
217         # time passes..
218         d = defer.Deferred()
219         def _check(res):
220             log.msg("doing _check")
221             self.failUnlessEqual(len(clients[0].connections), 5)
222         d.addCallback(_check)
223         reactor.callLater(2, d.callback, None)
224         return d
225     del test_system_this_one_breaks
226
227
228     def test_system_this_one_breaks_too(self):
229         # this one shuts down so quickly that it fails in a different way
230         self.central_tub = tub = Tub()
231         tub.setOption("logLocalFailures", True)
232         tub.setOption("logRemoteFailures", True)
233         tub.setServiceParent(self.parent)
234         l = tub.listenOn("tcp:0")
235         portnum = l.getPortnum()
236         tub.setLocation("localhost:%d" % portnum)
237
238         i = IntroducerService()
239         i.setServiceParent(self.parent)
240         iurl = tub.registerReference(i)
241
242         clients = []
243         for i in range(5):
244             tub = Tub()
245             tub.setOption("logLocalFailures", True)
246             tub.setOption("logRemoteFailures", True)
247             tub.setServiceParent(self.parent)
248             l = tub.listenOn("tcp:0")
249             portnum = l.getPortnum()
250             tub.setLocation("localhost:%d" % portnum)
251
252             n = MyNode()
253             node_furl = tub.registerReference(n)
254             c = IntroducerClient(tub, iurl, node_furl)
255             c.setServiceParent(self.parent)
256             clients.append(c)
257
258         # time passes..
259         d = defer.Deferred()
260         reactor.callLater(0.01, d.callback, None)
261         def _check(res):
262             log.msg("doing _check")
263             self.fail("BOOM")
264             for c in clients:
265                 self.failUnlessEqual(len(c.connections), 5)
266             c.connections.values()[0].tracker.broker.transport.loseConnection()
267             return self.stall(None, 2)
268         d.addCallback(_check)
269         def _check_again(res):
270             log.msg("doing _check_again")
271             for c in clients:
272                 self.failUnlessEqual(len(c.connections), 5)
273         d.addCallback(_check_again)
274         return d
275     del test_system_this_one_breaks_too