]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
editing: change names like "MyThing" to "FakeThing" for fake objects in unit tests
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32encode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.python import log
8
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
13 from allmydata.util import testutil
14
15 class FakeNode(Referenceable):
16     pass
17
18 class LoggingMultiService(service.MultiService):
19     def log(self, msg, **kw):
20         log.msg(msg, **kw)
21
22 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
23     def test_loadable(self):
24         basedir = "introducer.IntroducerNode.test_loadable"
25         os.mkdir(basedir)
26         q = IntroducerNode(basedir)
27         d = fireEventually(None)
28         d.addCallback(lambda res: q.startService())
29         d.addCallback(lambda res: q.when_tub_ready())
30         def _check_parameters(res):
31             i = q.getServiceNamed("introducer")
32             self.failUnlessEqual(i._encoding_parameters, (3, 7, 10))
33         d.addCallback(_check_parameters)
34         d.addCallback(lambda res: q.stopService())
35         d.addCallback(flushEventualQueue)
36         return d
37
38     def test_set_parameters(self):
39         basedir = "introducer.IntroducerNode.test_set_parameters"
40         os.mkdir(basedir)
41         f = open(os.path.join(basedir, "encoding_parameters"), "w")
42         f.write("25 75 100")
43         f.close()
44         q = IntroducerNode(basedir)
45         d = fireEventually(None)
46         d.addCallback(lambda res: q.startService())
47         d.addCallback(lambda res: q.when_tub_ready())
48         def _check_parameters(res):
49             i = q.getServiceNamed("introducer")
50             self.failUnlessEqual(i._encoding_parameters, (25, 75, 100))
51         d.addCallback(_check_parameters)
52         d.addCallback(lambda res: q.stopService())
53         d.addCallback(flushEventualQueue)
54         return d
55
56 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
57     def setUp(self):
58         self.parent = LoggingMultiService()
59         self.parent.startService()
60     def tearDown(self):
61         log.msg("TestIntroducer.tearDown")
62         d = defer.succeed(None)
63         d.addCallback(lambda res: self.parent.stopService())
64         d.addCallback(flushEventualQueue)
65         return d
66
67
68     def test_create(self):
69         ic = IntroducerClient(None, "introducer", "myfurl")
70         def _ignore(nodeid, rref):
71             pass
72         ic.notify_on_new_connection(_ignore)
73
74     def test_listen(self):
75         i = IntroducerService()
76         i.setServiceParent(self.parent)
77
78     def test_system(self):
79
80         self.central_tub = tub = Tub()
81         #tub.setOption("logLocalFailures", True)
82         #tub.setOption("logRemoteFailures", True)
83         tub.setServiceParent(self.parent)
84         l = tub.listenOn("tcp:0")
85         portnum = l.getPortnum()
86         tub.setLocation("localhost:%d" % portnum)
87
88         i = IntroducerService()
89         i.setServiceParent(self.parent)
90         iurl = tub.registerReference(i)
91         NUMCLIENTS = 5
92
93         clients = []
94         tubs = {}
95         for i in range(NUMCLIENTS):
96             tub = Tub()
97             #tub.setOption("logLocalFailures", True)
98             #tub.setOption("logRemoteFailures", True)
99             tub.setServiceParent(self.parent)
100             l = tub.listenOn("tcp:0")
101             portnum = l.getPortnum()
102             tub.setLocation("localhost:%d" % portnum)
103
104             n = FakeNode()
105             node_furl = tub.registerReference(n)
106             c = IntroducerClient(tub, iurl, node_furl)
107
108             c.setServiceParent(self.parent)
109             clients.append(c)
110             tubs[c] = tub
111
112         def _wait_for_all_connections(res):
113             dl = [] # list of when_enough_peers() for each peer
114             # will fire once everybody is connected
115             for c in clients:
116                 dl.append(c.when_enough_peers(NUMCLIENTS))
117             return defer.DeferredList(dl, fireOnOneErrback=True)
118
119         d = _wait_for_all_connections(None)
120
121         def _check1(res):
122             log.msg("doing _check1")
123             for c in clients:
124                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
125                 self.failUnless(c._connected) # to the introducer
126         d.addCallback(_check1)
127         origin_c = clients[0]
128         def _disconnect_somebody_else(res):
129             # now disconnect somebody's connection to someone else
130             # find a target that is not themselves
131             for nodeid,rref in origin_c.connections.items():
132                 if b32encode(nodeid).lower() != tubs[origin_c].tubID:
133                     victim = rref
134                     break
135             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
136             victim.tracker.broker.transport.loseConnection()
137             log.msg(" did disconnect")
138         d.addCallback(_disconnect_somebody_else)
139         def _wait_til_he_notices(res):
140             # wait til the origin_c notices the loss
141             log.msg(" waiting until peer notices the disconnection")
142             return origin_c.when_fewer_than_peers(NUMCLIENTS)
143         d.addCallback(_wait_til_he_notices)
144         d.addCallback(_wait_for_all_connections)
145         def _check2(res):
146             log.msg("doing _check2")
147             for c in clients:
148                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
149         d.addCallback(_check2)
150         def _disconnect_yourself(res):
151             # now disconnect somebody's connection to themselves.
152             # find a target that *is* themselves
153             for nodeid,rref in origin_c.connections.items():
154                 if b32encode(nodeid).lower() == tubs[origin_c].tubID:
155                     victim = rref
156                     break
157             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
158             victim.tracker.broker.transport.loseConnection()
159             log.msg(" did disconnect from self")
160         d.addCallback(_disconnect_yourself)
161         d.addCallback(_wait_til_he_notices)
162         d.addCallback(_wait_for_all_connections)
163         def _check3(res):
164             log.msg("doing _check3")
165             for c in clients:
166                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
167         d.addCallback(_check3)
168         def _shutdown_introducer(res):
169             # now shut down the introducer. We do this by shutting down the
170             # tub it's using. Nobody's connections (to each other) should go
171             # down. All clients should notice the loss, and no other errors
172             # should occur.
173             log.msg("shutting down the introducer")
174             return self.central_tub.disownServiceParent()
175         d.addCallback(_shutdown_introducer)
176         d.addCallback(self.stall, 2)
177         def _check4(res):
178             log.msg("doing _check4")
179             for c in clients:
180                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
181                 self.failIf(c._connected)
182         d.addCallback(_check4)
183         return d
184     test_system.timeout = 2400
185
186     def stall(self, res, timeout):
187         d = defer.Deferred()
188         reactor.callLater(timeout, d.callback, res)
189         return d
190
191     def test_system_this_one_breaks(self):
192         # this uses a single Tub, which has a strong effect on the
193         # failingness
194         tub = Tub()
195         tub.setOption("logLocalFailures", True)
196         tub.setOption("logRemoteFailures", True)
197         tub.setServiceParent(self.parent)
198         l = tub.listenOn("tcp:0")
199         portnum = l.getPortnum()
200         tub.setLocation("localhost:%d" % portnum)
201
202         i = IntroducerService()
203         i.setServiceParent(self.parent)
204         iurl = tub.registerReference(i)
205
206         clients = []
207         for i in range(5):
208             n = FakeNode()
209             node_furl = tub.registerReference(n)
210             c = IntroducerClient(tub, iurl, node_furl)
211             c.setServiceParent(self.parent)
212             clients.append(c)
213
214         # time passes..
215         d = defer.Deferred()
216         def _check(res):
217             log.msg("doing _check")
218             self.failUnlessEqual(len(clients[0].connections), 5)
219         d.addCallback(_check)
220         reactor.callLater(2, d.callback, None)
221         return d
222     del test_system_this_one_breaks
223
224
225     def test_system_this_one_breaks_too(self):
226         # this one shuts down so quickly that it fails in a different way
227         self.central_tub = tub = Tub()
228         tub.setOption("logLocalFailures", True)
229         tub.setOption("logRemoteFailures", True)
230         tub.setServiceParent(self.parent)
231         l = tub.listenOn("tcp:0")
232         portnum = l.getPortnum()
233         tub.setLocation("localhost:%d" % portnum)
234
235         i = IntroducerService()
236         i.setServiceParent(self.parent)
237         iurl = tub.registerReference(i)
238
239         clients = []
240         for i in range(5):
241             tub = Tub()
242             tub.setOption("logLocalFailures", True)
243             tub.setOption("logRemoteFailures", True)
244             tub.setServiceParent(self.parent)
245             l = tub.listenOn("tcp:0")
246             portnum = l.getPortnum()
247             tub.setLocation("localhost:%d" % portnum)
248
249             n = FakeNode()
250             node_furl = tub.registerReference(n)
251             c = IntroducerClient(tub, iurl, node_furl)
252             c.setServiceParent(self.parent)
253             clients.append(c)
254
255         # time passes..
256         d = defer.Deferred()
257         reactor.callLater(0.01, d.callback, None)
258         def _check(res):
259             log.msg("doing _check")
260             self.fail("BOOM")
261             for c in clients:
262                 self.failUnlessEqual(len(c.connections), 5)
263             c.connections.values()[0].tracker.broker.transport.loseConnection()
264             return self.stall(None, 2)
265         d.addCallback(_check)
266         def _check_again(res):
267             log.msg("doing _check_again")
268             for c in clients:
269                 self.failUnlessEqual(len(c.connections), 5)
270         d.addCallback(_check_again)
271         return d
272     del test_system_this_one_breaks_too