]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
#538: fetch version and attach to the rref. Make IntroducerClient demand v1 support.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32decode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
8
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient
14 from allmydata.introducer.server import IntroducerService
15 # test compatibility with old introducer .tac files
16 from allmydata.introducer import IntroducerNode
17 from allmydata.introducer import old
18 from allmydata.util import idlib, pollmixin
19 import common_util as testutil
20
21 class FakeNode(Referenceable):
22     pass
23
24 class LoggingMultiService(service.MultiService):
25     def log(self, msg, **kw):
26         log.msg(msg, **kw)
27
28 class Node(testutil.SignalMixin, unittest.TestCase):
29     def test_loadable(self):
30         basedir = "introducer.IntroducerNode.test_loadable"
31         os.mkdir(basedir)
32         q = IntroducerNode(basedir)
33         d = fireEventually(None)
34         d.addCallback(lambda res: q.startService())
35         d.addCallback(lambda res: q.when_tub_ready())
36         d.addCallback(lambda res: q.stopService())
37         d.addCallback(flushEventualQueue)
38         return d
39
40 class ServiceMixin:
41     def setUp(self):
42         self.parent = LoggingMultiService()
43         self.parent.startService()
44     def tearDown(self):
45         log.msg("TestIntroducer.tearDown")
46         d = defer.succeed(None)
47         d.addCallback(lambda res: self.parent.stopService())
48         d.addCallback(flushEventualQueue)
49         return d
50
51 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
52
53     def test_create(self):
54         ic = IntroducerClient(None, "introducer.furl", "my_nickname",
55                               "my_version", "oldest_version")
56
57     def test_listen(self):
58         i = IntroducerService()
59         i.setServiceParent(self.parent)
60
61     def test_duplicate(self):
62         i = IntroducerService()
63         self.failUnlessEqual(len(i.get_announcements()), 0)
64         self.failUnlessEqual(len(i.get_subscribers()), 0)
65         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
66         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
67         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
68         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
69         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
70         i.remote_publish(ann1)
71         self.failUnlessEqual(len(i.get_announcements()), 1)
72         self.failUnlessEqual(len(i.get_subscribers()), 0)
73         i.remote_publish(ann2)
74         self.failUnlessEqual(len(i.get_announcements()), 2)
75         self.failUnlessEqual(len(i.get_subscribers()), 0)
76         i.remote_publish(ann1b)
77         self.failUnlessEqual(len(i.get_announcements()), 2)
78         self.failUnlessEqual(len(i.get_subscribers()), 0)
79
80 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
81
82     def setUp(self):
83         ServiceMixin.setUp(self)
84         self.central_tub = tub = Tub()
85         #tub.setOption("logLocalFailures", True)
86         #tub.setOption("logRemoteFailures", True)
87         tub.setServiceParent(self.parent)
88         l = tub.listenOn("tcp:0")
89         portnum = l.getPortnum()
90         tub.setLocation("localhost:%d" % portnum)
91
92 class SystemTest(SystemTestMixin, unittest.TestCase):
93
94     def test_system(self):
95         i = IntroducerService()
96         i.setServiceParent(self.parent)
97         self.introducer_furl = self.central_tub.registerReference(i)
98         return self.do_system_test()
99
100     def test_system_oldserver(self):
101         i = old.IntroducerService_V1()
102         i.setServiceParent(self.parent)
103         self.introducer_furl = self.central_tub.registerReference(i)
104         return self.do_system_test()
105
106     def do_system_test(self):
107
108         NUMCLIENTS = 5
109         # we have 5 clients who publish themselves, and an extra one does
110         # which not. When the connections are fully established, all six nodes
111         # should have 5 connections each.
112
113         clients = []
114         tubs = {}
115         for i in range(NUMCLIENTS+1):
116             tub = Tub()
117             #tub.setOption("logLocalFailures", True)
118             #tub.setOption("logRemoteFailures", True)
119             tub.setServiceParent(self.parent)
120             l = tub.listenOn("tcp:0")
121             portnum = l.getPortnum()
122             tub.setLocation("localhost:%d" % portnum)
123
124             n = FakeNode()
125             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
126             client_class = IntroducerClient
127             if i == 0:
128                 client_class = old.IntroducerClient_V1
129             c = client_class(tub, self.introducer_furl,
130                              "nickname-%d" % i, "version", "oldest")
131             if i < NUMCLIENTS:
132                 node_furl = tub.registerReference(n)
133                 c.publish(node_furl, "storage", "ri_name")
134             # the last one does not publish anything
135
136             c.subscribe_to("storage")
137
138             c.setServiceParent(self.parent)
139             clients.append(c)
140             tubs[c] = tub
141
142         def _wait_for_all_connections():
143             for c in clients:
144                 if len(c.get_all_connections()) < NUMCLIENTS:
145                     return False
146             return True
147         d = self.poll(_wait_for_all_connections)
148
149         def _check1(res):
150             log.msg("doing _check1")
151             for c in clients:
152                 self.failUnless(c.connected_to_introducer())
153                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
154                 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
155                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
156                                      NUMCLIENTS)
157                 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
158                 self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
159                                      "nickname-0")
160         d.addCallback(_check1)
161
162         origin_c = clients[0]
163         def _disconnect_somebody_else(res):
164             # now disconnect somebody's connection to someone else
165             current_counter = origin_c.counter
166             victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
167             log.msg(" disconnecting %s->%s" %
168                     (tubs[origin_c].tubID,
169                      idlib.shortnodeid_b2a(victim_nodeid)))
170             origin_c.debug_disconnect_from_peerid(victim_nodeid)
171             log.msg(" did disconnect")
172
173             # then wait until something changes, which ought to be them
174             # noticing the loss
175             def _compare():
176                 return current_counter != origin_c.counter
177             return self.poll(_compare)
178
179         d.addCallback(_disconnect_somebody_else)
180
181         # and wait for them to reconnect
182         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
183         def _check2(res):
184             log.msg("doing _check2")
185             for c in clients:
186                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
187         d.addCallback(_check2)
188
189         def _disconnect_yourself(res):
190             # now disconnect somebody's connection to themselves.
191             current_counter = origin_c.counter
192             victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
193             log.msg(" disconnecting %s->%s" %
194                     (tubs[origin_c].tubID,
195                      idlib.shortnodeid_b2a(victim_nodeid)))
196             origin_c.debug_disconnect_from_peerid(victim_nodeid)
197             log.msg(" did disconnect from self")
198
199             def _compare():
200                 return current_counter != origin_c.counter
201             return self.poll(_compare)
202         d.addCallback(_disconnect_yourself)
203
204         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
205         def _check3(res):
206             log.msg("doing _check3")
207             for c in clients:
208                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
209                                      NUMCLIENTS)
210         d.addCallback(_check3)
211         def _shutdown_introducer(res):
212             # now shut down the introducer. We do this by shutting down the
213             # tub it's using. Nobody's connections (to each other) should go
214             # down. All clients should notice the loss, and no other errors
215             # should occur.
216             log.msg("shutting down the introducer")
217             return self.central_tub.disownServiceParent()
218         d.addCallback(_shutdown_introducer)
219         def _wait_for_introducer_loss():
220             for c in clients:
221                 if c.connected_to_introducer():
222                     return False
223             return True
224         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
225
226         def _check4(res):
227             log.msg("doing _check4")
228             for c in clients:
229                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
230                                      NUMCLIENTS)
231                 self.failIf(c.connected_to_introducer())
232         d.addCallback(_check4)
233         return d
234
235 class TooNewServer(IntroducerService):
236     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
237                  { },
238                 "application-version": "greetings from the crazy future",
239                 }
240
241 class NonV1Server(SystemTestMixin, unittest.TestCase):
242     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
243     # protocol, it is supposed to provide a useful error instead of a weird
244     # exception.
245
246     def test_failure(self):
247         i = TooNewServer()
248         i.setServiceParent(self.parent)
249         self.introducer_furl = self.central_tub.registerReference(i)
250
251         tub = Tub()
252         tub.setServiceParent(self.parent)
253         l = tub.listenOn("tcp:0")
254         portnum = l.getPortnum()
255         tub.setLocation("localhost:%d" % portnum)
256
257         n = FakeNode()
258         c = IntroducerClient(tub, self.introducer_furl,
259                              "nickname-client", "version", "oldest")
260         c.subscribe_to("storage")
261
262         c.setServiceParent(self.parent)
263
264         # now we wait for it to connect and notice the bad version
265
266         def _got_bad():
267             return bool(c._introducer_error) or bool(c._publisher)
268         d = self.poll(_got_bad)
269         def _done(res):
270             self.failUnless(c._introducer_error)
271             self.failUnless(c._introducer_error.check(InsufficientVersionError))
272         d.addCallback(_done)
273         return d
274