]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
hierarchical logging: add numbered messages and parent= args
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32encode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.python import log
8
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
13 from allmydata.util import testutil
14
15 class MyNode(Referenceable):
16     pass
17
18 class LoggingMultiService(service.MultiService):
19     def log(self, msg, **kw):
20         log.msg(msg, **kw)
21
22 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
23     def test_loadable(self):
24         basedir = "introducer.IntroducerNode.test_loadable"
25         os.mkdir(basedir)
26         q = IntroducerNode(basedir)
27         d = fireEventually(None)
28         d.addCallback(lambda res: q.startService())
29         d.addCallback(lambda res: q.when_tub_ready())
30         def _check_parameters(res):
31             i = q.getServiceNamed("introducer")
32             self.failUnlessEqual(i._encoding_parameters, (3, 7, 10))
33         d.addCallback(_check_parameters)
34         d.addCallback(lambda res: q.stopService())
35         d.addCallback(flushEventualQueue)
36         return d
37
38     def test_set_parameters(self):
39         basedir = "introducer.IntroducerNode.test_set_parameters"
40         os.mkdir(basedir)
41         f = open(os.path.join(basedir, "encoding_parameters"), "w")
42         f.write("25 75 100")
43         f.close()
44         q = IntroducerNode(basedir)
45         d = fireEventually(None)
46         d.addCallback(lambda res: q.startService())
47         d.addCallback(lambda res: q.when_tub_ready())
48         def _check_parameters(res):
49             i = q.getServiceNamed("introducer")
50             self.failUnlessEqual(i._encoding_parameters, (25, 75, 100))
51         d.addCallback(_check_parameters)
52         d.addCallback(lambda res: q.stopService())
53         d.addCallback(flushEventualQueue)
54         return d
55
56 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
57     def setUp(self):
58         self.parent = LoggingMultiService()
59         self.parent.startService()
60     def tearDown(self):
61         log.msg("TestIntroducer.tearDown")
62         d = defer.succeed(None)
63         d.addCallback(lambda res: self.parent.stopService())
64         d.addCallback(flushEventualQueue)
65         return d
66
67
68     def test_create(self):
69         ic = IntroducerClient(None, "introducer", "myfurl")
70         def _ignore(nodeid, rref):
71             pass
72         ic.notify_on_new_connection(_ignore)
73
74     def test_listen(self):
75         i = IntroducerService()
76         i.setServiceParent(self.parent)
77
78     def test_system(self):
79
80         self.central_tub = tub = Tub()
81         #tub.setOption("logLocalFailures", True)
82         #tub.setOption("logRemoteFailures", True)
83         tub.setServiceParent(self.parent)
84         l = tub.listenOn("tcp:0")
85         portnum = l.getPortnum()
86         tub.setLocation("localhost:%d" % portnum)
87
88         i = IntroducerService()
89         i.setServiceParent(self.parent)
90         iurl = tub.registerReference(i)
91         NUMCLIENTS = 5
92
93         self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
94         d = self._done_counting = defer.Deferred()
95         def _count(nodeid, rref):
96             log.msg("NEW CONNECTION! %s %s" % (b32encode(nodeid).lower(), rref))
97             self.waiting_for_connections -= 1
98             if self.waiting_for_connections == 0:
99                 self._done_counting.callback("done!")
100
101         clients = []
102         tubs = {}
103         for i in range(NUMCLIENTS):
104             tub = Tub()
105             #tub.setOption("logLocalFailures", True)
106             #tub.setOption("logRemoteFailures", True)
107             tub.setServiceParent(self.parent)
108             l = tub.listenOn("tcp:0")
109             portnum = l.getPortnum()
110             tub.setLocation("localhost:%d" % portnum)
111
112             n = MyNode()
113             node_furl = tub.registerReference(n)
114             c = IntroducerClient(tub, iurl, node_furl)
115             c.notify_on_new_connection(_count)
116             c.setServiceParent(self.parent)
117             clients.append(c)
118             tubs[c] = tub
119
120         # d will fire once everybody is connected
121
122         def _check1(res):
123             log.msg("doing _check1")
124             for c in clients:
125                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
126                 self.failUnless(c._connected) # to the introducer
127         d.addCallback(_check1)
128         def _disconnect_somebody_else(res):
129             # now disconnect somebody's connection to someone else
130             self.waiting_for_connections = 2
131             d2 = self._done_counting = defer.Deferred()
132             origin_c = clients[0]
133             # find a target that is not themselves
134             for nodeid,rref in origin_c.connections.items():
135                 if b32encode(nodeid).lower() != tubs[origin_c].tubID:
136                     victim = rref
137                     break
138             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
139             victim.tracker.broker.transport.loseConnection()
140             log.msg(" did disconnect")
141             return d2
142         d.addCallback(_disconnect_somebody_else)
143         def _check2(res):
144             log.msg("doing _check2")
145             for c in clients:
146                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
147         d.addCallback(_check2)
148         def _disconnect_yourself(res):
149             # now disconnect somebody's connection to themselves. This will
150             # only result in one new connection, since it is a loopback.
151             self.waiting_for_connections = 1
152             d2 = self._done_counting = defer.Deferred()
153             origin_c = clients[0]
154             # find a target that *is* themselves
155             for nodeid,rref in origin_c.connections.items():
156                 if b32encode(nodeid).lower() == tubs[origin_c].tubID:
157                     victim = rref
158                     break
159             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
160             victim.tracker.broker.transport.loseConnection()
161             log.msg(" did disconnect")
162             return d2
163         d.addCallback(_disconnect_yourself)
164         def _check3(res):
165             log.msg("doing _check3")
166             for c in clients:
167                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
168         d.addCallback(_check3)
169         def _shutdown_introducer(res):
170             # now shut down the introducer. We do this by shutting down the
171             # tub it's using. Nobody's connections (to each other) should go
172             # down. All clients should notice the loss, and no other errors
173             # should occur.
174             log.msg("shutting down the introducer")
175             return self.central_tub.disownServiceParent()
176         d.addCallback(_shutdown_introducer)
177         d.addCallback(self.stall, 2)
178         def _check4(res):
179             log.msg("doing _check4")
180             for c in clients:
181                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
182                 self.failIf(c._connected)
183         d.addCallback(_check4)
184         return d
185     test_system.timeout = 2400
186
187     def stall(self, res, timeout):
188         d = defer.Deferred()
189         reactor.callLater(timeout, d.callback, res)
190         return d
191
192     def test_system_this_one_breaks(self):
193         # this uses a single Tub, which has a strong effect on the
194         # failingness
195         tub = Tub()
196         tub.setOption("logLocalFailures", True)
197         tub.setOption("logRemoteFailures", True)
198         tub.setServiceParent(self.parent)
199         l = tub.listenOn("tcp:0")
200         portnum = l.getPortnum()
201         tub.setLocation("localhost:%d" % portnum)
202
203         i = IntroducerService()
204         i.setServiceParent(self.parent)
205         iurl = tub.registerReference(i)
206
207         clients = []
208         for i in range(5):
209             n = MyNode()
210             node_furl = tub.registerReference(n)
211             c = IntroducerClient(tub, iurl, node_furl)
212             c.setServiceParent(self.parent)
213             clients.append(c)
214
215         # time passes..
216         d = defer.Deferred()
217         def _check(res):
218             log.msg("doing _check")
219             self.failUnlessEqual(len(clients[0].connections), 5)
220         d.addCallback(_check)
221         reactor.callLater(2, d.callback, None)
222         return d
223     del test_system_this_one_breaks
224
225
226     def test_system_this_one_breaks_too(self):
227         # this one shuts down so quickly that it fails in a different way
228         self.central_tub = tub = Tub()
229         tub.setOption("logLocalFailures", True)
230         tub.setOption("logRemoteFailures", True)
231         tub.setServiceParent(self.parent)
232         l = tub.listenOn("tcp:0")
233         portnum = l.getPortnum()
234         tub.setLocation("localhost:%d" % portnum)
235
236         i = IntroducerService()
237         i.setServiceParent(self.parent)
238         iurl = tub.registerReference(i)
239
240         clients = []
241         for i in range(5):
242             tub = Tub()
243             tub.setOption("logLocalFailures", True)
244             tub.setOption("logRemoteFailures", True)
245             tub.setServiceParent(self.parent)
246             l = tub.listenOn("tcp:0")
247             portnum = l.getPortnum()
248             tub.setLocation("localhost:%d" % portnum)
249
250             n = MyNode()
251             node_furl = tub.registerReference(n)
252             c = IntroducerClient(tub, iurl, node_furl)
253             c.setServiceParent(self.parent)
254             clients.append(c)
255
256         # time passes..
257         d = defer.Deferred()
258         reactor.callLater(0.01, d.callback, None)
259         def _check(res):
260             log.msg("doing _check")
261             self.fail("BOOM")
262             for c in clients:
263                 self.failUnlessEqual(len(c.connections), 5)
264             c.connections.values()[0].tracker.broker.transport.loseConnection()
265             return self.stall(None, 2)
266         d.addCallback(_check)
267         def _check_again(res):
268             log.msg("doing _check_again")
269             for c in clients:
270                 self.failUnlessEqual(len(c.connections), 5)
271         d.addCallback(_check_again)
272         return d
273     del test_system_this_one_breaks_too
274