]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
introducer.py: add test coverage of _disconnected()
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32encode
2
3 from twisted.trial import unittest
4 from twisted.internet import defer, reactor
5 from twisted.python import log
6
7 from foolscap import Tub, Referenceable
8 from foolscap.eventual import flushEventualQueue
9 from twisted.application import service
10 from allmydata.introducer import IntroducerClient, Introducer
11 from allmydata.util import testutil
12
13 class MyNode(Referenceable):
14     pass
15
16 class LoggingMultiService(service.MultiService):
17     def log(self, msg):
18         log.msg(msg)
19
20 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
21     def setUp(self):
22         self.parent = LoggingMultiService()
23         self.parent.startService()
24     def tearDown(self):
25         log.msg("TestIntroducer.tearDown")
26         d = defer.succeed(None)
27         d.addCallback(lambda res: self.parent.stopService())
28         d.addCallback(flushEventualQueue)
29         return d
30
31
32     def test_create(self):
33         ic = IntroducerClient(None, "introducer", "myfurl")
34         def _ignore(nodeid, rref):
35             pass
36         ic.notify_on_new_connection(_ignore)
37
38     def test_listen(self):
39         i = Introducer()
40         i.setServiceParent(self.parent)
41
42     def test_system(self):
43
44         self.central_tub = tub = Tub()
45         #tub.setOption("logLocalFailures", True)
46         #tub.setOption("logRemoteFailures", True)
47         tub.setServiceParent(self.parent)
48         l = tub.listenOn("tcp:0")
49         portnum = l.getPortnum()
50         tub.setLocation("localhost:%d" % portnum)
51
52         i = Introducer()
53         i.setServiceParent(self.parent)
54         iurl = tub.registerReference(i)
55         NUMCLIENTS = 5
56
57         self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
58         d = self._done_counting = defer.Deferred()
59         def _count(nodeid, rref):
60             log.msg("NEW CONNECTION! %s %s" % (b32encode(nodeid).lower(), rref))
61             self.waiting_for_connections -= 1
62             if self.waiting_for_connections == 0:
63                 self._done_counting.callback("done!")
64
65         clients = []
66         tubs = {}
67         for i in range(NUMCLIENTS):
68             tub = Tub()
69             #tub.setOption("logLocalFailures", True)
70             #tub.setOption("logRemoteFailures", True)
71             tub.setServiceParent(self.parent)
72             l = tub.listenOn("tcp:0")
73             portnum = l.getPortnum()
74             tub.setLocation("localhost:%d" % portnum)
75
76             n = MyNode()
77             node_furl = tub.registerReference(n)
78             c = IntroducerClient(tub, iurl, node_furl)
79             c.notify_on_new_connection(_count)
80             c.setServiceParent(self.parent)
81             clients.append(c)
82             tubs[c] = tub
83
84         # d will fire once everybody is connected
85
86         def _check1(res):
87             log.msg("doing _check1")
88             for c in clients:
89                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
90                 self.failUnless(c._connected) # to the introducer
91         d.addCallback(_check1)
92         def _disconnect_somebody_else(res):
93             # now disconnect somebody's connection to someone else
94             self.waiting_for_connections = 2
95             d2 = self._done_counting = defer.Deferred()
96             origin_c = clients[0]
97             # find a target that is not themselves
98             for nodeid,rref in origin_c.connections.items():
99                 if b32encode(nodeid).lower() != tubs[origin_c].tubID:
100                     victim = rref
101                     break
102             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
103             victim.tracker.broker.transport.loseConnection()
104             log.msg(" did disconnect")
105             return d2
106         d.addCallback(_disconnect_somebody_else)
107         def _check2(res):
108             log.msg("doing _check2")
109             for c in clients:
110                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
111         d.addCallback(_check2)
112         def _disconnect_yourself(res):
113             # now disconnect somebody's connection to themselves. This will
114             # only result in one new connection, since it is a loopback.
115             self.waiting_for_connections = 1
116             d2 = self._done_counting = defer.Deferred()
117             origin_c = clients[0]
118             # find a target that *is* themselves
119             for nodeid,rref in origin_c.connections.items():
120                 if b32encode(nodeid).lower() == tubs[origin_c].tubID:
121                     victim = rref
122                     break
123             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
124             victim.tracker.broker.transport.loseConnection()
125             log.msg(" did disconnect")
126             return d2
127         d.addCallback(_disconnect_yourself)
128         def _check3(res):
129             log.msg("doing _check3")
130             for c in clients:
131                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
132         d.addCallback(_check3)
133         def _shutdown_introducer(res):
134             # now shut down the introducer. We do this by shutting down the
135             # tub it's using. Nobody's connections (to each other) should go
136             # down. All clients should notice the loss, and no other errors
137             # should occur.
138             log.msg("shutting down the introducer")
139             return self.central_tub.disownServiceParent()
140         d.addCallback(_shutdown_introducer)
141         d.addCallback(self.stall, 2)
142         def _check4(res):
143             log.msg("doing _check4")
144             for c in clients:
145                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
146                 self.failIf(c._connected)
147         d.addCallback(_check4)
148         return d
149     test_system.timeout = 2400
150
151     def stall(self, res, timeout):
152         d = defer.Deferred()
153         reactor.callLater(timeout, d.callback, res)
154         return d
155
156     def test_system_this_one_breaks(self):
157         # this uses a single Tub, which has a strong effect on the
158         # failingness
159         tub = Tub()
160         tub.setOption("logLocalFailures", True)
161         tub.setOption("logRemoteFailures", True)
162         tub.setServiceParent(self.parent)
163         l = tub.listenOn("tcp:0")
164         portnum = l.getPortnum()
165         tub.setLocation("localhost:%d" % portnum)
166
167         i = Introducer()
168         i.setServiceParent(self.parent)
169         iurl = tub.registerReference(i)
170
171         clients = []
172         for i in range(5):
173             n = MyNode()
174             node_furl = tub.registerReference(n)
175             c = IntroducerClient(tub, iurl, node_furl)
176             c.setServiceParent(self.parent)
177             clients.append(c)
178
179         # time passes..
180         d = defer.Deferred()
181         def _check(res):
182             log.msg("doing _check")
183             self.failUnlessEqual(len(clients[0].connections), 5)
184         d.addCallback(_check)
185         reactor.callLater(2, d.callback, None)
186         return d
187     del test_system_this_one_breaks
188
189
190     def test_system_this_one_breaks_too(self):
191         # this one shuts down so quickly that it fails in a different way
192         self.central_tub = tub = Tub()
193         tub.setOption("logLocalFailures", True)
194         tub.setOption("logRemoteFailures", True)
195         tub.setServiceParent(self.parent)
196         l = tub.listenOn("tcp:0")
197         portnum = l.getPortnum()
198         tub.setLocation("localhost:%d" % portnum)
199
200         i = Introducer()
201         i.setServiceParent(self.parent)
202         iurl = tub.registerReference(i)
203
204         clients = []
205         for i in range(5):
206             tub = Tub()
207             tub.setOption("logLocalFailures", True)
208             tub.setOption("logRemoteFailures", True)
209             tub.setServiceParent(self.parent)
210             l = tub.listenOn("tcp:0")
211             portnum = l.getPortnum()
212             tub.setLocation("localhost:%d" % portnum)
213
214             n = MyNode()
215             node_furl = tub.registerReference(n)
216             c = IntroducerClient(tub, iurl, node_furl)
217             c.setServiceParent(self.parent)
218             clients.append(c)
219
220         # time passes..
221         d = defer.Deferred()
222         reactor.callLater(0.01, d.callback, None)
223         def _check(res):
224             log.msg("doing _check")
225             self.fail("BOOM")
226             for c in clients:
227                 self.failUnlessEqual(len(c.connections), 5)
228             c.connections.values()[0].tracker.broker.transport.loseConnection()
229             return self.stall(None, 2)
230         d.addCallback(_check)
231         def _check_again(res):
232             log.msg("doing _check_again")
233             for c in clients:
234                 self.failUnlessEqual(len(c.connections), 5)
235         d.addCallback(_check_again)
236         return d
237     del test_system_this_one_breaks_too
238