]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
test_introducer.py: add a test for the python2.4.0/2.4.1 bug in base64.b32decode
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32decode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
8
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient
14 from allmydata.introducer.server import IntroducerService
15 from allmydata.introducer.common import make_index
16 # test compatibility with old introducer .tac files
17 from allmydata.introducer import IntroducerNode
18 from allmydata.introducer import old
19 from allmydata.util import idlib, pollmixin
20 import common_util as testutil
21
22 class FakeNode(Referenceable):
23     pass
24
25 class LoggingMultiService(service.MultiService):
26     def log(self, msg, **kw):
27         log.msg(msg, **kw)
28
29 class Node(testutil.SignalMixin, unittest.TestCase):
30     def test_loadable(self):
31         basedir = "introducer.IntroducerNode.test_loadable"
32         os.mkdir(basedir)
33         q = IntroducerNode(basedir)
34         d = fireEventually(None)
35         d.addCallback(lambda res: q.startService())
36         d.addCallback(lambda res: q.when_tub_ready())
37         d.addCallback(lambda res: q.stopService())
38         d.addCallback(flushEventualQueue)
39         return d
40
41 class ServiceMixin:
42     def setUp(self):
43         self.parent = LoggingMultiService()
44         self.parent.startService()
45     def tearDown(self):
46         log.msg("TestIntroducer.tearDown")
47         d = defer.succeed(None)
48         d.addCallback(lambda res: self.parent.stopService())
49         d.addCallback(flushEventualQueue)
50         return d
51
52 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
53
54     def test_create(self):
55         ic = IntroducerClient(None, "introducer.furl", "my_nickname",
56                               "my_version", "oldest_version")
57
58     def test_listen(self):
59         i = IntroducerService()
60         i.setServiceParent(self.parent)
61
62     def test_duplicate(self):
63         i = IntroducerService()
64         self.failUnlessEqual(len(i.get_announcements()), 0)
65         self.failUnlessEqual(len(i.get_subscribers()), 0)
66         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
67         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
68         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
69         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
70         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
71         i.remote_publish(ann1)
72         self.failUnlessEqual(len(i.get_announcements()), 1)
73         self.failUnlessEqual(len(i.get_subscribers()), 0)
74         i.remote_publish(ann2)
75         self.failUnlessEqual(len(i.get_announcements()), 2)
76         self.failUnlessEqual(len(i.get_subscribers()), 0)
77         i.remote_publish(ann1b)
78         self.failUnlessEqual(len(i.get_announcements()), 2)
79         self.failUnlessEqual(len(i.get_subscribers()), 0)
80
81 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
82
83     def setUp(self):
84         ServiceMixin.setUp(self)
85         self.central_tub = tub = Tub()
86         #tub.setOption("logLocalFailures", True)
87         #tub.setOption("logRemoteFailures", True)
88         tub.setServiceParent(self.parent)
89         l = tub.listenOn("tcp:0")
90         portnum = l.getPortnum()
91         tub.setLocation("localhost:%d" % portnum)
92
93 class SystemTest(SystemTestMixin, unittest.TestCase):
94
95     def test_system(self):
96         i = IntroducerService()
97         i.setServiceParent(self.parent)
98         self.introducer_furl = self.central_tub.registerReference(i)
99         return self.do_system_test()
100
101     def test_system_oldserver(self):
102         i = old.IntroducerService_V1()
103         i.setServiceParent(self.parent)
104         self.introducer_furl = self.central_tub.registerReference(i)
105         return self.do_system_test()
106
107     def do_system_test(self):
108
109         NUMCLIENTS = 5
110         # we have 5 clients who publish themselves, and an extra one does
111         # which not. When the connections are fully established, all six nodes
112         # should have 5 connections each.
113
114         clients = []
115         tubs = {}
116         for i in range(NUMCLIENTS+1):
117             tub = Tub()
118             #tub.setOption("logLocalFailures", True)
119             #tub.setOption("logRemoteFailures", True)
120             tub.setServiceParent(self.parent)
121             l = tub.listenOn("tcp:0")
122             portnum = l.getPortnum()
123             tub.setLocation("localhost:%d" % portnum)
124
125             n = FakeNode()
126             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
127             client_class = IntroducerClient
128             if i == 0:
129                 client_class = old.IntroducerClient_V1
130             c = client_class(tub, self.introducer_furl,
131                              "nickname-%d" % i, "version", "oldest")
132             if i < NUMCLIENTS:
133                 node_furl = tub.registerReference(n)
134                 c.publish(node_furl, "storage", "ri_name")
135             # the last one does not publish anything
136
137             c.subscribe_to("storage")
138
139             c.setServiceParent(self.parent)
140             clients.append(c)
141             tubs[c] = tub
142
143         def _wait_for_all_connections():
144             for c in clients:
145                 if len(c.get_all_connections()) < NUMCLIENTS:
146                     return False
147             return True
148         d = self.poll(_wait_for_all_connections)
149
150         def _check1(res):
151             log.msg("doing _check1")
152             for c in clients:
153                 self.failUnless(c.connected_to_introducer())
154                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
155                 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
156                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
157                                      NUMCLIENTS)
158                 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
159                 self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
160                                      "nickname-0")
161         d.addCallback(_check1)
162
163         origin_c = clients[0]
164         def _disconnect_somebody_else(res):
165             # now disconnect somebody's connection to someone else
166             current_counter = origin_c.counter
167             victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
168             log.msg(" disconnecting %s->%s" %
169                     (tubs[origin_c].tubID,
170                      idlib.shortnodeid_b2a(victim_nodeid)))
171             origin_c.debug_disconnect_from_peerid(victim_nodeid)
172             log.msg(" did disconnect")
173
174             # then wait until something changes, which ought to be them
175             # noticing the loss
176             def _compare():
177                 return current_counter != origin_c.counter
178             return self.poll(_compare)
179
180         d.addCallback(_disconnect_somebody_else)
181
182         # and wait for them to reconnect
183         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
184         def _check2(res):
185             log.msg("doing _check2")
186             for c in clients:
187                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
188         d.addCallback(_check2)
189
190         def _disconnect_yourself(res):
191             # now disconnect somebody's connection to themselves.
192             current_counter = origin_c.counter
193             victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
194             log.msg(" disconnecting %s->%s" %
195                     (tubs[origin_c].tubID,
196                      idlib.shortnodeid_b2a(victim_nodeid)))
197             origin_c.debug_disconnect_from_peerid(victim_nodeid)
198             log.msg(" did disconnect from self")
199
200             def _compare():
201                 return current_counter != origin_c.counter
202             return self.poll(_compare)
203         d.addCallback(_disconnect_yourself)
204
205         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
206         def _check3(res):
207             log.msg("doing _check3")
208             for c in clients:
209                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
210                                      NUMCLIENTS)
211         d.addCallback(_check3)
212         def _shutdown_introducer(res):
213             # now shut down the introducer. We do this by shutting down the
214             # tub it's using. Nobody's connections (to each other) should go
215             # down. All clients should notice the loss, and no other errors
216             # should occur.
217             log.msg("shutting down the introducer")
218             return self.central_tub.disownServiceParent()
219         d.addCallback(_shutdown_introducer)
220         def _wait_for_introducer_loss():
221             for c in clients:
222                 if c.connected_to_introducer():
223                     return False
224             return True
225         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
226
227         def _check4(res):
228             log.msg("doing _check4")
229             for c in clients:
230                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
231                                      NUMCLIENTS)
232                 self.failIf(c.connected_to_introducer())
233         d.addCallback(_check4)
234         return d
235
236 class TooNewServer(IntroducerService):
237     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
238                  { },
239                 "application-version": "greetings from the crazy future",
240                 }
241
242 class NonV1Server(SystemTestMixin, unittest.TestCase):
243     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
244     # protocol, it is supposed to provide a useful error instead of a weird
245     # exception.
246
247     def test_failure(self):
248         i = TooNewServer()
249         i.setServiceParent(self.parent)
250         self.introducer_furl = self.central_tub.registerReference(i)
251
252         tub = Tub()
253         tub.setServiceParent(self.parent)
254         l = tub.listenOn("tcp:0")
255         portnum = l.getPortnum()
256         tub.setLocation("localhost:%d" % portnum)
257
258         n = FakeNode()
259         c = IntroducerClient(tub, self.introducer_furl,
260                              "nickname-client", "version", "oldest")
261         c.subscribe_to("storage")
262
263         c.setServiceParent(self.parent)
264
265         # now we wait for it to connect and notice the bad version
266
267         def _got_bad():
268             return bool(c._introducer_error) or bool(c._publisher)
269         d = self.poll(_got_bad)
270         def _done(res):
271             self.failUnless(c._introducer_error)
272             self.failUnless(c._introducer_error.check(InsufficientVersionError))
273         d.addCallback(_done)
274         return d
275
276 class Index(unittest.TestCase):
277     def test_make_index(self):
278         # make sure we have a working base64.b32decode. The one in
279         # python2.4.[01] was broken.
280         ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i',
281                'storage', 'RIStorageServer.tahoe.allmydata.com',
282                'plancha', 'allmydata-tahoe/1.4.1', '1.0.0')
283         (nodeid, service_name) = make_index(ann)
284         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
285         self.failUnlessEqual(service_name, "storage")
286