]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
bump some unit tests up to very high timeouts because my poor G4 867 MHz PowerBook...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 from twisted.trial import unittest
3 from twisted.internet import defer, reactor
4 from twisted.python import log
5 defer.setDebugging(True)
6
7 from foolscap import Tub, Referenceable
8 from foolscap.eventual import flushEventualQueue
9 from twisted.application import service
10 from allmydata.introducer import IntroducerClient, Introducer
11 from allmydata.util import idlib
12
13 class MyNode(Referenceable):
14     pass
15
16 class LoggingMultiService(service.MultiService):
17     def log(self, msg):
18         pass
19
20 class TestIntroducer(unittest.TestCase):
21     def setUp(self):
22         self.parent = LoggingMultiService()
23         self.parent.startService()
24     def tearDown(self):
25         log.msg("TestIntroducer.tearDown")
26         d = defer.succeed(None)
27         d.addCallback(lambda res: self.parent.stopService())
28         d.addCallback(flushEventualQueue)
29         return d
30
31
32     def poll(self, check_f, pollinterval=0.01):
33         # Return a Deferred, then call check_f periodically until it returns
34         # True, at which point the Deferred will fire.. If check_f raises an
35         # exception, the Deferred will errback.
36         d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
37         return d
38
39     def _poll(self, res, check_f, pollinterval):
40         if check_f():
41             return True
42         d = defer.Deferred()
43         d.addCallback(self._poll, check_f, pollinterval)
44         reactor.callLater(pollinterval, d.callback, None)
45         return d
46
47
48     def test_create(self):
49         ic = IntroducerClient(None, "introducer", "mypburl")
50         def _ignore(nodeid, rref):
51             pass
52         ic.notify_on_new_connection(_ignore)
53
54     def test_listen(self):
55         i = Introducer()
56         i.setServiceParent(self.parent)
57
58     def test_system(self):
59
60         self.central_tub = tub = Tub()
61         #tub.setOption("logLocalFailures", True)
62         #tub.setOption("logRemoteFailures", True)
63         tub.setServiceParent(self.parent)
64         l = tub.listenOn("tcp:0")
65         portnum = l.getPortnum()
66         tub.setLocation("localhost:%d" % portnum)
67
68         i = Introducer()
69         i.setServiceParent(self.parent)
70         iurl = tub.registerReference(i)
71         NUMCLIENTS = 5
72
73         self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
74         d = self._done_counting = defer.Deferred()
75         def _count(nodeid, rref):
76             log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
77             self.waiting_for_connections -= 1
78             if self.waiting_for_connections == 0:
79                 self._done_counting.callback("done!")
80
81         clients = []
82         tubs = {}
83         for i in range(NUMCLIENTS):
84             tub = Tub()
85             #tub.setOption("logLocalFailures", True)
86             #tub.setOption("logRemoteFailures", True)
87             tub.setServiceParent(self.parent)
88             l = tub.listenOn("tcp:0")
89             portnum = l.getPortnum()
90             tub.setLocation("localhost:%d" % portnum)
91
92             n = MyNode()
93             node_pburl = tub.registerReference(n)
94             c = IntroducerClient(tub, iurl, node_pburl)
95             c.notify_on_new_connection(_count)
96             c.setServiceParent(self.parent)
97             clients.append(c)
98             tubs[c] = tub
99
100         # d will fire once everybody is connected
101
102         def _check(res):
103             log.msg("doing _check")
104             for c in clients:
105                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
106             # now disconnect somebody's connection to someone else
107             self.waiting_for_connections = 2
108             d2 = self._done_counting = defer.Deferred()
109             origin_c = clients[0]
110             # find a target that is not themselves
111             for nodeid,rref in origin_c.connections.items():
112                 if idlib.b2a(nodeid) != tubs[origin_c].tubID:
113                     victim = rref
114                     break
115             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
116             victim.tracker.broker.transport.loseConnection()
117             log.msg(" did disconnect")
118             return d2
119         d.addCallback(_check)
120         def _check_again(res):
121             log.msg("doing _check_again")
122             for c in clients:
123                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
124             # now disconnect somebody's connection to themselves. This will
125             # only result in one new connection, since it is a loopback.
126             self.waiting_for_connections = 1
127             d2 = self._done_counting = defer.Deferred()
128             origin_c = clients[0]
129             # find a target that *is* themselves
130             for nodeid,rref in origin_c.connections.items():
131                 if idlib.b2a(nodeid) == tubs[origin_c].tubID:
132                     victim = rref
133                     break
134             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
135             victim.tracker.broker.transport.loseConnection()
136             log.msg(" did disconnect")
137             return d2
138         d.addCallback(_check_again)
139         def _check_again2(res):
140             log.msg("doing _check_again2")
141             for c in clients:
142                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
143             # now disconnect somebody's connection to themselves
144         d.addCallback(_check_again2)
145         return d
146     test_system.timeout = 1200
147
148     def stall(self, res, timeout):
149         d = defer.Deferred()
150         reactor.callLater(timeout, d.callback, res)
151         return d
152
153     def test_system_this_one_breaks(self):
154         # this uses a single Tub, which has a strong effect on the
155         # failingness
156         tub = Tub()
157         tub.setOption("logLocalFailures", True)
158         tub.setOption("logRemoteFailures", True)
159         tub.setServiceParent(self.parent)
160         l = tub.listenOn("tcp:0")
161         portnum = l.getPortnum()
162         tub.setLocation("localhost:%d" % portnum)
163
164         i = Introducer()
165         i.setServiceParent(self.parent)
166         iurl = tub.registerReference(i)
167
168         clients = []
169         for i in range(5):
170             n = MyNode()
171             node_pburl = tub.registerReference(n)
172             c = IntroducerClient(tub, iurl, node_pburl)
173             c.setServiceParent(self.parent)
174             clients.append(c)
175
176         # time passes..
177         d = defer.Deferred()
178         def _check(res):
179             log.msg("doing _check")
180             self.failUnlessEqual(len(clients[0].connections), 5)
181         d.addCallback(_check)
182         reactor.callLater(2, d.callback, None)
183         return d
184     del test_system_this_one_breaks
185
186
187     def test_system_this_one_breaks_too(self):
188         # this one shuts down so quickly that it fails in a different way
189         self.central_tub = tub = Tub()
190         tub.setOption("logLocalFailures", True)
191         tub.setOption("logRemoteFailures", True)
192         tub.setServiceParent(self.parent)
193         l = tub.listenOn("tcp:0")
194         portnum = l.getPortnum()
195         tub.setLocation("localhost:%d" % portnum)
196
197         i = Introducer()
198         i.setServiceParent(self.parent)
199         iurl = tub.registerReference(i)
200
201         clients = []
202         for i in range(5):
203             tub = Tub()
204             tub.setOption("logLocalFailures", True)
205             tub.setOption("logRemoteFailures", True)
206             tub.setServiceParent(self.parent)
207             l = tub.listenOn("tcp:0")
208             portnum = l.getPortnum()
209             tub.setLocation("localhost:%d" % portnum)
210
211             n = MyNode()
212             node_pburl = tub.registerReference(n)
213             c = IntroducerClient(tub, iurl, node_pburl)
214             c.setServiceParent(self.parent)
215             clients.append(c)
216
217         # time passes..
218         d = defer.Deferred()
219         reactor.callLater(0.01, d.callback, None)
220         def _check(res):
221             log.msg("doing _check")
222             self.fail("BOOM")
223             for c in clients:
224                 self.failUnlessEqual(len(c.connections), 5)
225             c.connections.values()[0].tracker.broker.transport.loseConnection()
226             return self.stall(None, 2)
227         d.addCallback(_check)
228         def _check_again(res):
229             log.msg("doing _check_again")
230             for c in clients:
231                 self.failUnlessEqual(len(c.connections), 5)
232         d.addCallback(_check_again)
233         return d
234     del test_system_this_one_breaks_too
235