]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
switch to using RemoteException instead of 'wrapped' RemoteReferences. Should fix...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32decode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
8
9 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
10 from twisted.application import service
11 from allmydata.interfaces import InsufficientVersionError
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 from allmydata.introducer.common import make_index
15 # test compatibility with old introducer .tac files
16 from allmydata.introducer import IntroducerNode
17 from allmydata.introducer import old
18 from allmydata.util import idlib, pollmixin
19 import common_util as testutil
20
21 class FakeNode(Referenceable):
22     pass
23
24 class LoggingMultiService(service.MultiService):
25     def log(self, msg, **kw):
26         log.msg(msg, **kw)
27
28 class Node(testutil.SignalMixin, unittest.TestCase):
29     def test_loadable(self):
30         basedir = "introducer.IntroducerNode.test_loadable"
31         os.mkdir(basedir)
32         q = IntroducerNode(basedir)
33         d = fireEventually(None)
34         d.addCallback(lambda res: q.startService())
35         d.addCallback(lambda res: q.when_tub_ready())
36         d.addCallback(lambda res: q.stopService())
37         d.addCallback(flushEventualQueue)
38         return d
39
40 class ServiceMixin:
41     def setUp(self):
42         self.parent = LoggingMultiService()
43         self.parent.startService()
44     def tearDown(self):
45         log.msg("TestIntroducer.tearDown")
46         d = defer.succeed(None)
47         d.addCallback(lambda res: self.parent.stopService())
48         d.addCallback(flushEventualQueue)
49         return d
50
51 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
52
53     def test_create(self):
54         ic = IntroducerClient(None, "introducer.furl", "my_nickname",
55                               "my_version", "oldest_version")
56
57     def test_listen(self):
58         i = IntroducerService()
59         i.setServiceParent(self.parent)
60
61     def test_duplicate(self):
62         i = IntroducerService()
63         self.failUnlessEqual(len(i.get_announcements()), 0)
64         self.failUnlessEqual(len(i.get_subscribers()), 0)
65         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
66         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
67         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
68         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
69         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
70         i.remote_publish(ann1)
71         self.failUnlessEqual(len(i.get_announcements()), 1)
72         self.failUnlessEqual(len(i.get_subscribers()), 0)
73         i.remote_publish(ann2)
74         self.failUnlessEqual(len(i.get_announcements()), 2)
75         self.failUnlessEqual(len(i.get_subscribers()), 0)
76         i.remote_publish(ann1b)
77         self.failUnlessEqual(len(i.get_announcements()), 2)
78         self.failUnlessEqual(len(i.get_subscribers()), 0)
79
80 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
81
82     def setUp(self):
83         ServiceMixin.setUp(self)
84         self.central_tub = tub = Tub()
85         #tub.setOption("logLocalFailures", True)
86         #tub.setOption("logRemoteFailures", True)
87         tub.setOption("expose-remote-exception-types", False)
88         tub.setServiceParent(self.parent)
89         l = tub.listenOn("tcp:0")
90         portnum = l.getPortnum()
91         tub.setLocation("localhost:%d" % portnum)
92
93 class SystemTest(SystemTestMixin, unittest.TestCase):
94
95     def test_system(self):
96         i = IntroducerService()
97         i.setServiceParent(self.parent)
98         self.introducer_furl = self.central_tub.registerReference(i)
99         return self.do_system_test()
100
101     def test_system_oldserver(self):
102         i = old.IntroducerService_V1()
103         i.setServiceParent(self.parent)
104         self.introducer_furl = self.central_tub.registerReference(i)
105         return self.do_system_test()
106
107     def do_system_test(self):
108
109         NUMCLIENTS = 5
110         # we have 5 clients who publish themselves, and an extra one does
111         # which not. When the connections are fully established, all six nodes
112         # should have 5 connections each.
113
114         clients = []
115         tubs = {}
116         for i in range(NUMCLIENTS+1):
117             tub = Tub()
118             #tub.setOption("logLocalFailures", True)
119             #tub.setOption("logRemoteFailures", True)
120             tub.setOption("expose-remote-exception-types", False)
121             tub.setServiceParent(self.parent)
122             l = tub.listenOn("tcp:0")
123             portnum = l.getPortnum()
124             tub.setLocation("localhost:%d" % portnum)
125
126             n = FakeNode()
127             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
128             client_class = IntroducerClient
129             if i == 0:
130                 client_class = old.IntroducerClient_V1
131             c = client_class(tub, self.introducer_furl,
132                              "nickname-%d" % i, "version", "oldest")
133             if i < NUMCLIENTS:
134                 node_furl = tub.registerReference(n)
135                 c.publish(node_furl, "storage", "ri_name")
136             # the last one does not publish anything
137
138             c.subscribe_to("storage")
139
140             c.setServiceParent(self.parent)
141             clients.append(c)
142             tubs[c] = tub
143
144         def _wait_for_all_connections():
145             for c in clients:
146                 if len(c.get_all_connections()) < NUMCLIENTS:
147                     return False
148             return True
149         d = self.poll(_wait_for_all_connections)
150
151         def _check1(res):
152             log.msg("doing _check1")
153             for c in clients:
154                 self.failUnless(c.connected_to_introducer())
155                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
156                 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
157                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
158                                      NUMCLIENTS)
159                 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
160                 self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
161                                      "nickname-0")
162         d.addCallback(_check1)
163
164         origin_c = clients[0]
165         def _disconnect_somebody_else(res):
166             # now disconnect somebody's connection to someone else
167             current_counter = origin_c.counter
168             victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
169             log.msg(" disconnecting %s->%s" %
170                     (tubs[origin_c].tubID,
171                      idlib.shortnodeid_b2a(victim_nodeid)))
172             origin_c.debug_disconnect_from_peerid(victim_nodeid)
173             log.msg(" did disconnect")
174
175             # then wait until something changes, which ought to be them
176             # noticing the loss
177             def _compare():
178                 return current_counter != origin_c.counter
179             return self.poll(_compare)
180
181         d.addCallback(_disconnect_somebody_else)
182
183         # and wait for them to reconnect
184         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
185         def _check2(res):
186             log.msg("doing _check2")
187             for c in clients:
188                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
189         d.addCallback(_check2)
190
191         def _disconnect_yourself(res):
192             # now disconnect somebody's connection to themselves.
193             current_counter = origin_c.counter
194             victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
195             log.msg(" disconnecting %s->%s" %
196                     (tubs[origin_c].tubID,
197                      idlib.shortnodeid_b2a(victim_nodeid)))
198             origin_c.debug_disconnect_from_peerid(victim_nodeid)
199             log.msg(" did disconnect from self")
200
201             def _compare():
202                 return current_counter != origin_c.counter
203             return self.poll(_compare)
204         d.addCallback(_disconnect_yourself)
205
206         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
207         def _check3(res):
208             log.msg("doing _check3")
209             for c in clients:
210                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
211                                      NUMCLIENTS)
212         d.addCallback(_check3)
213         def _shutdown_introducer(res):
214             # now shut down the introducer. We do this by shutting down the
215             # tub it's using. Nobody's connections (to each other) should go
216             # down. All clients should notice the loss, and no other errors
217             # should occur.
218             log.msg("shutting down the introducer")
219             return self.central_tub.disownServiceParent()
220         d.addCallback(_shutdown_introducer)
221         def _wait_for_introducer_loss():
222             for c in clients:
223                 if c.connected_to_introducer():
224                     return False
225             return True
226         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
227
228         def _check4(res):
229             log.msg("doing _check4")
230             for c in clients:
231                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
232                                      NUMCLIENTS)
233                 self.failIf(c.connected_to_introducer())
234         d.addCallback(_check4)
235         return d
236
237 class TooNewServer(IntroducerService):
238     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
239                  { },
240                 "application-version": "greetings from the crazy future",
241                 }
242
243 class NonV1Server(SystemTestMixin, unittest.TestCase):
244     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
245     # protocol, it is supposed to provide a useful error instead of a weird
246     # exception.
247
248     def test_failure(self):
249         i = TooNewServer()
250         i.setServiceParent(self.parent)
251         self.introducer_furl = self.central_tub.registerReference(i)
252
253         tub = Tub()
254         tub.setOption("expose-remote-exception-types", False)
255         tub.setServiceParent(self.parent)
256         l = tub.listenOn("tcp:0")
257         portnum = l.getPortnum()
258         tub.setLocation("localhost:%d" % portnum)
259
260         n = FakeNode()
261         c = IntroducerClient(tub, self.introducer_furl,
262                              "nickname-client", "version", "oldest")
263         c.subscribe_to("storage")
264
265         c.setServiceParent(self.parent)
266
267         # now we wait for it to connect and notice the bad version
268
269         def _got_bad():
270             return bool(c._introducer_error) or bool(c._publisher)
271         d = self.poll(_got_bad)
272         def _done(res):
273             self.failUnless(c._introducer_error)
274             self.failUnless(c._introducer_error.check(InsufficientVersionError))
275         d.addCallback(_done)
276         return d
277
278 class Index(unittest.TestCase):
279     def test_make_index(self):
280         # make sure we have a working base64.b32decode. The one in
281         # python2.4.[01] was broken.
282         ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i',
283                'storage', 'RIStorageServer.tahoe.allmydata.com',
284                'plancha', 'allmydata-tahoe/1.4.1', '1.0.0')
285         (nodeid, service_name) = make_index(ann)
286         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
287         self.failUnlessEqual(service_name, "storage")
288