1 from base64 import b32decode
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
9 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
10 from twisted.application import service
11 from allmydata.interfaces import InsufficientVersionError
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 from allmydata.introducer.common import make_index
15 # test compatibility with old introducer .tac files
16 from allmydata.introducer import IntroducerNode
17 from allmydata.introducer import old
18 from allmydata.util import idlib, pollmixin
19 import common_util as testutil
21 class FakeNode(Referenceable):
24 class LoggingMultiService(service.MultiService):
25 def log(self, msg, **kw):
28 class Node(testutil.SignalMixin, unittest.TestCase):
29 def test_loadable(self):
30 basedir = "introducer.IntroducerNode.test_loadable"
32 q = IntroducerNode(basedir)
33 d = fireEventually(None)
34 d.addCallback(lambda res: q.startService())
35 d.addCallback(lambda res: q.when_tub_ready())
36 d.addCallback(lambda res: q.stopService())
37 d.addCallback(flushEventualQueue)
42 self.parent = LoggingMultiService()
43 self.parent.startService()
45 log.msg("TestIntroducer.tearDown")
46 d = defer.succeed(None)
47 d.addCallback(lambda res: self.parent.stopService())
48 d.addCallback(flushEventualQueue)
51 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
53 def test_create(self):
54 ic = IntroducerClient(None, "introducer.furl", "my_nickname",
55 "my_version", "oldest_version")
57 def test_listen(self):
58 i = IntroducerService()
59 i.setServiceParent(self.parent)
61 def test_duplicate(self):
62 i = IntroducerService()
63 self.failUnlessEqual(len(i.get_announcements()), 0)
64 self.failUnlessEqual(len(i.get_subscribers()), 0)
65 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
66 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
67 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
68 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
69 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
70 i.remote_publish(ann1)
71 self.failUnlessEqual(len(i.get_announcements()), 1)
72 self.failUnlessEqual(len(i.get_subscribers()), 0)
73 i.remote_publish(ann2)
74 self.failUnlessEqual(len(i.get_announcements()), 2)
75 self.failUnlessEqual(len(i.get_subscribers()), 0)
76 i.remote_publish(ann1b)
77 self.failUnlessEqual(len(i.get_announcements()), 2)
78 self.failUnlessEqual(len(i.get_subscribers()), 0)
80 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
83 ServiceMixin.setUp(self)
84 self.central_tub = tub = Tub()
85 #tub.setOption("logLocalFailures", True)
86 #tub.setOption("logRemoteFailures", True)
87 tub.setOption("expose-remote-exception-types", False)
88 tub.setServiceParent(self.parent)
89 l = tub.listenOn("tcp:0")
90 portnum = l.getPortnum()
91 tub.setLocation("localhost:%d" % portnum)
93 class SystemTest(SystemTestMixin, unittest.TestCase):
95 def test_system(self):
96 i = IntroducerService()
97 i.setServiceParent(self.parent)
98 self.introducer_furl = self.central_tub.registerReference(i)
99 return self.do_system_test()
101 def test_system_oldserver(self):
102 i = old.IntroducerService_V1()
103 i.setServiceParent(self.parent)
104 self.introducer_furl = self.central_tub.registerReference(i)
105 return self.do_system_test()
107 def do_system_test(self):
110 # we have 5 clients who publish themselves, and an extra one does
111 # which not. When the connections are fully established, all six nodes
112 # should have 5 connections each.
116 for i in range(NUMCLIENTS+1):
118 #tub.setOption("logLocalFailures", True)
119 #tub.setOption("logRemoteFailures", True)
120 tub.setOption("expose-remote-exception-types", False)
121 tub.setServiceParent(self.parent)
122 l = tub.listenOn("tcp:0")
123 portnum = l.getPortnum()
124 tub.setLocation("localhost:%d" % portnum)
127 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
128 client_class = IntroducerClient
130 client_class = old.IntroducerClient_V1
131 c = client_class(tub, self.introducer_furl,
132 "nickname-%d" % i, "version", "oldest")
134 node_furl = tub.registerReference(n)
135 c.publish(node_furl, "storage", "ri_name")
136 # the last one does not publish anything
138 c.subscribe_to("storage")
140 c.setServiceParent(self.parent)
144 def _wait_for_all_connections():
146 if len(c.get_all_connections()) < NUMCLIENTS:
149 d = self.poll(_wait_for_all_connections)
152 log.msg("doing _check1")
154 self.failUnless(c.connected_to_introducer())
155 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
156 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
157 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
159 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
160 self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
162 d.addCallback(_check1)
164 origin_c = clients[0]
165 def _disconnect_somebody_else(res):
166 # now disconnect somebody's connection to someone else
167 current_counter = origin_c.counter
168 victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
169 log.msg(" disconnecting %s->%s" %
170 (tubs[origin_c].tubID,
171 idlib.shortnodeid_b2a(victim_nodeid)))
172 origin_c.debug_disconnect_from_peerid(victim_nodeid)
173 log.msg(" did disconnect")
175 # then wait until something changes, which ought to be them
178 return current_counter != origin_c.counter
179 return self.poll(_compare)
181 d.addCallback(_disconnect_somebody_else)
183 # and wait for them to reconnect
184 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
186 log.msg("doing _check2")
188 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
189 d.addCallback(_check2)
191 def _disconnect_yourself(res):
192 # now disconnect somebody's connection to themselves.
193 current_counter = origin_c.counter
194 victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
195 log.msg(" disconnecting %s->%s" %
196 (tubs[origin_c].tubID,
197 idlib.shortnodeid_b2a(victim_nodeid)))
198 origin_c.debug_disconnect_from_peerid(victim_nodeid)
199 log.msg(" did disconnect from self")
202 return current_counter != origin_c.counter
203 return self.poll(_compare)
204 d.addCallback(_disconnect_yourself)
206 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
208 log.msg("doing _check3")
210 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
212 d.addCallback(_check3)
213 def _shutdown_introducer(res):
214 # now shut down the introducer. We do this by shutting down the
215 # tub it's using. Nobody's connections (to each other) should go
216 # down. All clients should notice the loss, and no other errors
218 log.msg("shutting down the introducer")
219 return self.central_tub.disownServiceParent()
220 d.addCallback(_shutdown_introducer)
221 def _wait_for_introducer_loss():
223 if c.connected_to_introducer():
226 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
229 log.msg("doing _check4")
231 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
233 self.failIf(c.connected_to_introducer())
234 d.addCallback(_check4)
237 class TooNewServer(IntroducerService):
238 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
240 "application-version": "greetings from the crazy future",
243 class NonV1Server(SystemTestMixin, unittest.TestCase):
244 # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
245 # protocol, it is supposed to provide a useful error instead of a weird
248 def test_failure(self):
250 i.setServiceParent(self.parent)
251 self.introducer_furl = self.central_tub.registerReference(i)
254 tub.setOption("expose-remote-exception-types", False)
255 tub.setServiceParent(self.parent)
256 l = tub.listenOn("tcp:0")
257 portnum = l.getPortnum()
258 tub.setLocation("localhost:%d" % portnum)
261 c = IntroducerClient(tub, self.introducer_furl,
262 "nickname-client", "version", "oldest")
263 c.subscribe_to("storage")
265 c.setServiceParent(self.parent)
267 # now we wait for it to connect and notice the bad version
270 return bool(c._introducer_error) or bool(c._publisher)
271 d = self.poll(_got_bad)
273 self.failUnless(c._introducer_error)
274 self.failUnless(c._introducer_error.check(InsufficientVersionError))
278 class Index(unittest.TestCase):
279 def test_make_index(self):
280 # make sure we have a working base64.b32decode. The one in
281 # python2.4.[01] was broken.
282 ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i',
283 'storage', 'RIStorageServer.tahoe.allmydata.com',
284 'plancha', 'allmydata-tahoe/1.4.1', '1.0.0')
285 (nodeid, service_name) = make_index(ann)
286 self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
287 self.failUnlessEqual(service_name, "storage")