]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
remove unused imports: hush pyflakes warnings
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32encode
2
3 from twisted.trial import unittest
4 from twisted.internet import defer, reactor
5 from twisted.python import log
6
7 from foolscap import Tub, Referenceable
8 from foolscap.eventual import flushEventualQueue
9 from twisted.application import service
10 from allmydata.introducer import IntroducerClient, Introducer
11 from allmydata.util import testutil
12
13 class MyNode(Referenceable):
14     pass
15
16 class LoggingMultiService(service.MultiService):
17     def log(self, msg):
18         pass
19
20 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
21     def setUp(self):
22         self.parent = LoggingMultiService()
23         self.parent.startService()
24     def tearDown(self):
25         log.msg("TestIntroducer.tearDown")
26         d = defer.succeed(None)
27         d.addCallback(lambda res: self.parent.stopService())
28         d.addCallback(flushEventualQueue)
29         return d
30
31
32     def test_create(self):
33         ic = IntroducerClient(None, "introducer", "myfurl")
34         def _ignore(nodeid, rref):
35             pass
36         ic.notify_on_new_connection(_ignore)
37
38     def test_listen(self):
39         i = Introducer()
40         i.setServiceParent(self.parent)
41
42     def test_system(self):
43
44         self.central_tub = tub = Tub()
45         #tub.setOption("logLocalFailures", True)
46         #tub.setOption("logRemoteFailures", True)
47         tub.setServiceParent(self.parent)
48         l = tub.listenOn("tcp:0")
49         portnum = l.getPortnum()
50         tub.setLocation("localhost:%d" % portnum)
51
52         i = Introducer()
53         i.setServiceParent(self.parent)
54         iurl = tub.registerReference(i)
55         NUMCLIENTS = 5
56
57         self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
58         d = self._done_counting = defer.Deferred()
59         def _count(nodeid, rref):
60             log.msg("NEW CONNECTION! %s %s" % (b32encode(nodeid).lower(), rref))
61             self.waiting_for_connections -= 1
62             if self.waiting_for_connections == 0:
63                 self._done_counting.callback("done!")
64
65         clients = []
66         tubs = {}
67         for i in range(NUMCLIENTS):
68             tub = Tub()
69             #tub.setOption("logLocalFailures", True)
70             #tub.setOption("logRemoteFailures", True)
71             tub.setServiceParent(self.parent)
72             l = tub.listenOn("tcp:0")
73             portnum = l.getPortnum()
74             tub.setLocation("localhost:%d" % portnum)
75
76             n = MyNode()
77             node_furl = tub.registerReference(n)
78             c = IntroducerClient(tub, iurl, node_furl)
79             c.notify_on_new_connection(_count)
80             c.setServiceParent(self.parent)
81             clients.append(c)
82             tubs[c] = tub
83
84         # d will fire once everybody is connected
85
86         def _check(res):
87             log.msg("doing _check")
88             for c in clients:
89                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
90             # now disconnect somebody's connection to someone else
91             self.waiting_for_connections = 2
92             d2 = self._done_counting = defer.Deferred()
93             origin_c = clients[0]
94             # find a target that is not themselves
95             for nodeid,rref in origin_c.connections.items():
96                 if b32encode(nodeid).lower() != tubs[origin_c].tubID:
97                     victim = rref
98                     break
99             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
100             victim.tracker.broker.transport.loseConnection()
101             log.msg(" did disconnect")
102             return d2
103         d.addCallback(_check)
104         def _check_again(res):
105             log.msg("doing _check_again")
106             for c in clients:
107                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
108             # now disconnect somebody's connection to themselves. This will
109             # only result in one new connection, since it is a loopback.
110             self.waiting_for_connections = 1
111             d2 = self._done_counting = defer.Deferred()
112             origin_c = clients[0]
113             # find a target that *is* themselves
114             for nodeid,rref in origin_c.connections.items():
115                 if b32encode(nodeid).lower() == tubs[origin_c].tubID:
116                     victim = rref
117                     break
118             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
119             victim.tracker.broker.transport.loseConnection()
120             log.msg(" did disconnect")
121             return d2
122         d.addCallback(_check_again)
123         def _check_again2(res):
124             log.msg("doing _check_again2")
125             for c in clients:
126                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
127             # now disconnect somebody's connection to themselves
128         d.addCallback(_check_again2)
129         return d
130     test_system.timeout = 2400
131
132     def stall(self, res, timeout):
133         d = defer.Deferred()
134         reactor.callLater(timeout, d.callback, res)
135         return d
136
137     def test_system_this_one_breaks(self):
138         # this uses a single Tub, which has a strong effect on the
139         # failingness
140         tub = Tub()
141         tub.setOption("logLocalFailures", True)
142         tub.setOption("logRemoteFailures", True)
143         tub.setServiceParent(self.parent)
144         l = tub.listenOn("tcp:0")
145         portnum = l.getPortnum()
146         tub.setLocation("localhost:%d" % portnum)
147
148         i = Introducer()
149         i.setServiceParent(self.parent)
150         iurl = tub.registerReference(i)
151
152         clients = []
153         for i in range(5):
154             n = MyNode()
155             node_furl = tub.registerReference(n)
156             c = IntroducerClient(tub, iurl, node_furl)
157             c.setServiceParent(self.parent)
158             clients.append(c)
159
160         # time passes..
161         d = defer.Deferred()
162         def _check(res):
163             log.msg("doing _check")
164             self.failUnlessEqual(len(clients[0].connections), 5)
165         d.addCallback(_check)
166         reactor.callLater(2, d.callback, None)
167         return d
168     del test_system_this_one_breaks
169
170
171     def test_system_this_one_breaks_too(self):
172         # this one shuts down so quickly that it fails in a different way
173         self.central_tub = tub = Tub()
174         tub.setOption("logLocalFailures", True)
175         tub.setOption("logRemoteFailures", True)
176         tub.setServiceParent(self.parent)
177         l = tub.listenOn("tcp:0")
178         portnum = l.getPortnum()
179         tub.setLocation("localhost:%d" % portnum)
180
181         i = Introducer()
182         i.setServiceParent(self.parent)
183         iurl = tub.registerReference(i)
184
185         clients = []
186         for i in range(5):
187             tub = Tub()
188             tub.setOption("logLocalFailures", True)
189             tub.setOption("logRemoteFailures", True)
190             tub.setServiceParent(self.parent)
191             l = tub.listenOn("tcp:0")
192             portnum = l.getPortnum()
193             tub.setLocation("localhost:%d" % portnum)
194
195             n = MyNode()
196             node_furl = tub.registerReference(n)
197             c = IntroducerClient(tub, iurl, node_furl)
198             c.setServiceParent(self.parent)
199             clients.append(c)
200
201         # time passes..
202         d = defer.Deferred()
203         reactor.callLater(0.01, d.callback, None)
204         def _check(res):
205             log.msg("doing _check")
206             self.fail("BOOM")
207             for c in clients:
208                 self.failUnlessEqual(len(c.connections), 5)
209             c.connections.values()[0].tracker.broker.transport.loseConnection()
210             return self.stall(None, 2)
211         d.addCallback(_check)
212         def _check_again(res):
213             log.msg("doing _check_again")
214             for c in clients:
215                 self.failUnlessEqual(len(c.connections), 5)
216         d.addCallback(_check_again)
217         return d
218     del test_system_this_one_breaks_too
219