]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
rename all "*PBURL*" to "*FURL*"
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 from twisted.trial import unittest
3 from twisted.internet import defer, reactor
4 from twisted.python import log
5
6 from foolscap import Tub, Referenceable
7 from foolscap.eventual import flushEventualQueue
8 from twisted.application import service
9 from allmydata.introducer import IntroducerClient, Introducer
10 from allmydata.util import idlib
11
12 class MyNode(Referenceable):
13     pass
14
15 class LoggingMultiService(service.MultiService):
16     def log(self, msg):
17         pass
18
19 class TestIntroducer(unittest.TestCase):
20     def setUp(self):
21         self.parent = LoggingMultiService()
22         self.parent.startService()
23     def tearDown(self):
24         log.msg("TestIntroducer.tearDown")
25         d = defer.succeed(None)
26         d.addCallback(lambda res: self.parent.stopService())
27         d.addCallback(flushEventualQueue)
28         return d
29
30
31     def poll(self, check_f, pollinterval=0.01):
32         # Return a Deferred, then call check_f periodically until it returns
33         # True, at which point the Deferred will fire.. If check_f raises an
34         # exception, the Deferred will errback.
35         d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
36         return d
37
38     def _poll(self, res, check_f, pollinterval):
39         if check_f():
40             return True
41         d = defer.Deferred()
42         d.addCallback(self._poll, check_f, pollinterval)
43         reactor.callLater(pollinterval, d.callback, None)
44         return d
45
46
47     def test_create(self):
48         ic = IntroducerClient(None, "introducer", "myfurl")
49         def _ignore(nodeid, rref):
50             pass
51         ic.notify_on_new_connection(_ignore)
52
53     def test_listen(self):
54         i = Introducer()
55         i.setServiceParent(self.parent)
56
57     def test_system(self):
58
59         self.central_tub = tub = Tub()
60         #tub.setOption("logLocalFailures", True)
61         #tub.setOption("logRemoteFailures", True)
62         tub.setServiceParent(self.parent)
63         l = tub.listenOn("tcp:0")
64         portnum = l.getPortnum()
65         tub.setLocation("localhost:%d" % portnum)
66
67         i = Introducer()
68         i.setServiceParent(self.parent)
69         iurl = tub.registerReference(i)
70         NUMCLIENTS = 5
71
72         self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
73         d = self._done_counting = defer.Deferred()
74         def _count(nodeid, rref):
75             log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
76             self.waiting_for_connections -= 1
77             if self.waiting_for_connections == 0:
78                 self._done_counting.callback("done!")
79
80         clients = []
81         tubs = {}
82         for i in range(NUMCLIENTS):
83             tub = Tub()
84             #tub.setOption("logLocalFailures", True)
85             #tub.setOption("logRemoteFailures", True)
86             tub.setServiceParent(self.parent)
87             l = tub.listenOn("tcp:0")
88             portnum = l.getPortnum()
89             tub.setLocation("localhost:%d" % portnum)
90
91             n = MyNode()
92             node_furl = tub.registerReference(n)
93             c = IntroducerClient(tub, iurl, node_furl)
94             c.notify_on_new_connection(_count)
95             c.setServiceParent(self.parent)
96             clients.append(c)
97             tubs[c] = tub
98
99         # d will fire once everybody is connected
100
101         def _check(res):
102             log.msg("doing _check")
103             for c in clients:
104                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
105             # now disconnect somebody's connection to someone else
106             self.waiting_for_connections = 2
107             d2 = self._done_counting = defer.Deferred()
108             origin_c = clients[0]
109             # find a target that is not themselves
110             for nodeid,rref in origin_c.connections.items():
111                 if idlib.b2a(nodeid) != tubs[origin_c].tubID:
112                     victim = rref
113                     break
114             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
115             victim.tracker.broker.transport.loseConnection()
116             log.msg(" did disconnect")
117             return d2
118         d.addCallback(_check)
119         def _check_again(res):
120             log.msg("doing _check_again")
121             for c in clients:
122                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
123             # now disconnect somebody's connection to themselves. This will
124             # only result in one new connection, since it is a loopback.
125             self.waiting_for_connections = 1
126             d2 = self._done_counting = defer.Deferred()
127             origin_c = clients[0]
128             # find a target that *is* themselves
129             for nodeid,rref in origin_c.connections.items():
130                 if idlib.b2a(nodeid) == tubs[origin_c].tubID:
131                     victim = rref
132                     break
133             log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
134             victim.tracker.broker.transport.loseConnection()
135             log.msg(" did disconnect")
136             return d2
137         d.addCallback(_check_again)
138         def _check_again2(res):
139             log.msg("doing _check_again2")
140             for c in clients:
141                 self.failUnlessEqual(len(c.connections), NUMCLIENTS)
142             # now disconnect somebody's connection to themselves
143         d.addCallback(_check_again2)
144         return d
145     test_system.timeout = 2400
146
147     def stall(self, res, timeout):
148         d = defer.Deferred()
149         reactor.callLater(timeout, d.callback, res)
150         return d
151
152     def test_system_this_one_breaks(self):
153         # this uses a single Tub, which has a strong effect on the
154         # failingness
155         tub = Tub()
156         tub.setOption("logLocalFailures", True)
157         tub.setOption("logRemoteFailures", True)
158         tub.setServiceParent(self.parent)
159         l = tub.listenOn("tcp:0")
160         portnum = l.getPortnum()
161         tub.setLocation("localhost:%d" % portnum)
162
163         i = Introducer()
164         i.setServiceParent(self.parent)
165         iurl = tub.registerReference(i)
166
167         clients = []
168         for i in range(5):
169             n = MyNode()
170             node_furl = tub.registerReference(n)
171             c = IntroducerClient(tub, iurl, node_furl)
172             c.setServiceParent(self.parent)
173             clients.append(c)
174
175         # time passes..
176         d = defer.Deferred()
177         def _check(res):
178             log.msg("doing _check")
179             self.failUnlessEqual(len(clients[0].connections), 5)
180         d.addCallback(_check)
181         reactor.callLater(2, d.callback, None)
182         return d
183     del test_system_this_one_breaks
184
185
186     def test_system_this_one_breaks_too(self):
187         # this one shuts down so quickly that it fails in a different way
188         self.central_tub = tub = Tub()
189         tub.setOption("logLocalFailures", True)
190         tub.setOption("logRemoteFailures", True)
191         tub.setServiceParent(self.parent)
192         l = tub.listenOn("tcp:0")
193         portnum = l.getPortnum()
194         tub.setLocation("localhost:%d" % portnum)
195
196         i = Introducer()
197         i.setServiceParent(self.parent)
198         iurl = tub.registerReference(i)
199
200         clients = []
201         for i in range(5):
202             tub = Tub()
203             tub.setOption("logLocalFailures", True)
204             tub.setOption("logRemoteFailures", True)
205             tub.setServiceParent(self.parent)
206             l = tub.listenOn("tcp:0")
207             portnum = l.getPortnum()
208             tub.setLocation("localhost:%d" % portnum)
209
210             n = MyNode()
211             node_furl = tub.registerReference(n)
212             c = IntroducerClient(tub, iurl, node_furl)
213             c.setServiceParent(self.parent)
214             clients.append(c)
215
216         # time passes..
217         d = defer.Deferred()
218         reactor.callLater(0.01, d.callback, None)
219         def _check(res):
220             log.msg("doing _check")
221             self.fail("BOOM")
222             for c in clients:
223                 self.failUnlessEqual(len(c.connections), 5)
224             c.connections.values()[0].tracker.broker.transport.loseConnection()
225             return self.stall(None, 2)
226         d.addCallback(_check)
227         def _check_again(res):
228             log.msg("doing _check_again")
229             for c in clients:
230                 self.failUnlessEqual(len(c.connections), 5)
231         d.addCallback(_check_again)
232         return d
233     del test_system_this_one_breaks_too
234