]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
move testutil into test/common_util.py, since it doesn't count as 'code under test...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32decode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
8
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 # test compatibility with old introducer .tac files
15 from allmydata.introducer import IntroducerNode
16 from allmydata.introducer import old
17 from allmydata.util import idlib, pollmixin
18 import common_util as testutil
19
20 class FakeNode(Referenceable):
21     pass
22
23 class LoggingMultiService(service.MultiService):
24     def log(self, msg, **kw):
25         log.msg(msg, **kw)
26
27 class Node(testutil.SignalMixin, unittest.TestCase):
28     def test_loadable(self):
29         basedir = "introducer.IntroducerNode.test_loadable"
30         os.mkdir(basedir)
31         q = IntroducerNode(basedir)
32         d = fireEventually(None)
33         d.addCallback(lambda res: q.startService())
34         d.addCallback(lambda res: q.when_tub_ready())
35         d.addCallback(lambda res: q.stopService())
36         d.addCallback(flushEventualQueue)
37         return d
38
39 class ServiceMixin:
40     def setUp(self):
41         self.parent = LoggingMultiService()
42         self.parent.startService()
43     def tearDown(self):
44         log.msg("TestIntroducer.tearDown")
45         d = defer.succeed(None)
46         d.addCallback(lambda res: self.parent.stopService())
47         d.addCallback(flushEventualQueue)
48         return d
49
50 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
51
52     def test_create(self):
53         ic = IntroducerClient(None, "introducer.furl", "my_nickname",
54                               "my_version", "oldest_version")
55
56     def test_listen(self):
57         i = IntroducerService()
58         i.setServiceParent(self.parent)
59
60     def test_duplicate(self):
61         i = IntroducerService()
62         self.failUnlessEqual(len(i.get_announcements()), 0)
63         self.failUnlessEqual(len(i.get_subscribers()), 0)
64         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
65         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
66         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
67         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
68         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
69         i.remote_publish(ann1)
70         self.failUnlessEqual(len(i.get_announcements()), 1)
71         self.failUnlessEqual(len(i.get_subscribers()), 0)
72         i.remote_publish(ann2)
73         self.failUnlessEqual(len(i.get_announcements()), 2)
74         self.failUnlessEqual(len(i.get_subscribers()), 0)
75         i.remote_publish(ann1b)
76         self.failUnlessEqual(len(i.get_announcements()), 2)
77         self.failUnlessEqual(len(i.get_subscribers()), 0)
78
79 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
80
81     def setUp(self):
82         ServiceMixin.setUp(self)
83         self.central_tub = tub = Tub()
84         #tub.setOption("logLocalFailures", True)
85         #tub.setOption("logRemoteFailures", True)
86         tub.setServiceParent(self.parent)
87         l = tub.listenOn("tcp:0")
88         portnum = l.getPortnum()
89         tub.setLocation("localhost:%d" % portnum)
90
91 class SystemTest(SystemTestMixin, unittest.TestCase):
92
93     def test_system(self):
94         i = IntroducerService()
95         i.setServiceParent(self.parent)
96         self.introducer_furl = self.central_tub.registerReference(i)
97         return self.do_system_test()
98
99     def test_system_oldserver(self):
100         i = old.IntroducerService_V1()
101         i.setServiceParent(self.parent)
102         self.introducer_furl = self.central_tub.registerReference(i)
103         return self.do_system_test()
104
105     def do_system_test(self):
106
107         NUMCLIENTS = 5
108         # we have 5 clients who publish themselves, and an extra one does
109         # which not. When the connections are fully established, all six nodes
110         # should have 5 connections each.
111
112         clients = []
113         tubs = {}
114         for i in range(NUMCLIENTS+1):
115             tub = Tub()
116             #tub.setOption("logLocalFailures", True)
117             #tub.setOption("logRemoteFailures", True)
118             tub.setServiceParent(self.parent)
119             l = tub.listenOn("tcp:0")
120             portnum = l.getPortnum()
121             tub.setLocation("localhost:%d" % portnum)
122
123             n = FakeNode()
124             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
125             client_class = IntroducerClient
126             if i == 0:
127                 client_class = old.IntroducerClient_V1
128             c = client_class(tub, self.introducer_furl,
129                              "nickname-%d" % i, "version", "oldest")
130             if i < NUMCLIENTS:
131                 node_furl = tub.registerReference(n)
132                 c.publish(node_furl, "storage", "ri_name")
133             # the last one does not publish anything
134
135             c.subscribe_to("storage")
136
137             c.setServiceParent(self.parent)
138             clients.append(c)
139             tubs[c] = tub
140
141         def _wait_for_all_connections():
142             for c in clients:
143                 if len(c.get_all_connections()) < NUMCLIENTS:
144                     return False
145             return True
146         d = self.poll(_wait_for_all_connections)
147
148         def _check1(res):
149             log.msg("doing _check1")
150             for c in clients:
151                 self.failUnless(c.connected_to_introducer())
152                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
153                 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
154                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
155                                      NUMCLIENTS)
156                 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
157                 self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
158                                      "nickname-0")
159         d.addCallback(_check1)
160
161         origin_c = clients[0]
162         def _disconnect_somebody_else(res):
163             # now disconnect somebody's connection to someone else
164             current_counter = origin_c.counter
165             victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
166             log.msg(" disconnecting %s->%s" %
167                     (tubs[origin_c].tubID,
168                      idlib.shortnodeid_b2a(victim_nodeid)))
169             origin_c.debug_disconnect_from_peerid(victim_nodeid)
170             log.msg(" did disconnect")
171
172             # then wait until something changes, which ought to be them
173             # noticing the loss
174             def _compare():
175                 return current_counter != origin_c.counter
176             return self.poll(_compare)
177
178         d.addCallback(_disconnect_somebody_else)
179
180         # and wait for them to reconnect
181         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
182         def _check2(res):
183             log.msg("doing _check2")
184             for c in clients:
185                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
186         d.addCallback(_check2)
187
188         def _disconnect_yourself(res):
189             # now disconnect somebody's connection to themselves.
190             current_counter = origin_c.counter
191             victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
192             log.msg(" disconnecting %s->%s" %
193                     (tubs[origin_c].tubID,
194                      idlib.shortnodeid_b2a(victim_nodeid)))
195             origin_c.debug_disconnect_from_peerid(victim_nodeid)
196             log.msg(" did disconnect from self")
197
198             def _compare():
199                 return current_counter != origin_c.counter
200             return self.poll(_compare)
201         d.addCallback(_disconnect_yourself)
202
203         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
204         def _check3(res):
205             log.msg("doing _check3")
206             for c in clients:
207                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
208                                      NUMCLIENTS)
209         d.addCallback(_check3)
210         def _shutdown_introducer(res):
211             # now shut down the introducer. We do this by shutting down the
212             # tub it's using. Nobody's connections (to each other) should go
213             # down. All clients should notice the loss, and no other errors
214             # should occur.
215             log.msg("shutting down the introducer")
216             return self.central_tub.disownServiceParent()
217         d.addCallback(_shutdown_introducer)
218         def _wait_for_introducer_loss():
219             for c in clients:
220                 if c.connected_to_introducer():
221                     return False
222             return True
223         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
224
225         def _check4(res):
226             log.msg("doing _check4")
227             for c in clients:
228                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
229                                      NUMCLIENTS)
230                 self.failIf(c.connected_to_introducer())
231         d.addCallback(_check4)
232         return d