1 from base64 import b32decode
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient
14 from allmydata.introducer.server import IntroducerService
15 from allmydata.introducer.common import make_index
16 # test compatibility with old introducer .tac files
17 from allmydata.introducer import IntroducerNode
18 from allmydata.introducer import old
19 from allmydata.util import idlib, pollmixin
20 import common_util as testutil
22 class FakeNode(Referenceable):
25 class LoggingMultiService(service.MultiService):
26 def log(self, msg, **kw):
29 class Node(testutil.SignalMixin, unittest.TestCase):
30 def test_loadable(self):
31 basedir = "introducer.IntroducerNode.test_loadable"
33 q = IntroducerNode(basedir)
34 d = fireEventually(None)
35 d.addCallback(lambda res: q.startService())
36 d.addCallback(lambda res: q.when_tub_ready())
37 d.addCallback(lambda res: q.stopService())
38 d.addCallback(flushEventualQueue)
43 self.parent = LoggingMultiService()
44 self.parent.startService()
46 log.msg("TestIntroducer.tearDown")
47 d = defer.succeed(None)
48 d.addCallback(lambda res: self.parent.stopService())
49 d.addCallback(flushEventualQueue)
52 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
54 def test_create(self):
55 ic = IntroducerClient(None, "introducer.furl", "my_nickname",
56 "my_version", "oldest_version")
58 def test_listen(self):
59 i = IntroducerService()
60 i.setServiceParent(self.parent)
62 def test_duplicate(self):
63 i = IntroducerService()
64 self.failUnlessEqual(len(i.get_announcements()), 0)
65 self.failUnlessEqual(len(i.get_subscribers()), 0)
66 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
67 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
68 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
69 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
70 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
71 i.remote_publish(ann1)
72 self.failUnlessEqual(len(i.get_announcements()), 1)
73 self.failUnlessEqual(len(i.get_subscribers()), 0)
74 i.remote_publish(ann2)
75 self.failUnlessEqual(len(i.get_announcements()), 2)
76 self.failUnlessEqual(len(i.get_subscribers()), 0)
77 i.remote_publish(ann1b)
78 self.failUnlessEqual(len(i.get_announcements()), 2)
79 self.failUnlessEqual(len(i.get_subscribers()), 0)
81 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
84 ServiceMixin.setUp(self)
85 self.central_tub = tub = Tub()
86 #tub.setOption("logLocalFailures", True)
87 #tub.setOption("logRemoteFailures", True)
88 tub.setServiceParent(self.parent)
89 l = tub.listenOn("tcp:0")
90 portnum = l.getPortnum()
91 tub.setLocation("localhost:%d" % portnum)
93 class SystemTest(SystemTestMixin, unittest.TestCase):
95 def test_system(self):
96 i = IntroducerService()
97 i.setServiceParent(self.parent)
98 self.introducer_furl = self.central_tub.registerReference(i)
99 return self.do_system_test()
101 def test_system_oldserver(self):
102 i = old.IntroducerService_V1()
103 i.setServiceParent(self.parent)
104 self.introducer_furl = self.central_tub.registerReference(i)
105 return self.do_system_test()
107 def do_system_test(self):
110 # we have 5 clients who publish themselves, and an extra one does
111 # which not. When the connections are fully established, all six nodes
112 # should have 5 connections each.
116 for i in range(NUMCLIENTS+1):
118 #tub.setOption("logLocalFailures", True)
119 #tub.setOption("logRemoteFailures", True)
120 tub.setServiceParent(self.parent)
121 l = tub.listenOn("tcp:0")
122 portnum = l.getPortnum()
123 tub.setLocation("localhost:%d" % portnum)
126 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
127 client_class = IntroducerClient
129 client_class = old.IntroducerClient_V1
130 c = client_class(tub, self.introducer_furl,
131 "nickname-%d" % i, "version", "oldest")
133 node_furl = tub.registerReference(n)
134 c.publish(node_furl, "storage", "ri_name")
135 # the last one does not publish anything
137 c.subscribe_to("storage")
139 c.setServiceParent(self.parent)
143 def _wait_for_all_connections():
145 if len(c.get_all_connections()) < NUMCLIENTS:
148 d = self.poll(_wait_for_all_connections)
151 log.msg("doing _check1")
153 self.failUnless(c.connected_to_introducer())
154 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
155 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
156 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
158 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
159 self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
161 d.addCallback(_check1)
163 origin_c = clients[0]
164 def _disconnect_somebody_else(res):
165 # now disconnect somebody's connection to someone else
166 current_counter = origin_c.counter
167 victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
168 log.msg(" disconnecting %s->%s" %
169 (tubs[origin_c].tubID,
170 idlib.shortnodeid_b2a(victim_nodeid)))
171 origin_c.debug_disconnect_from_peerid(victim_nodeid)
172 log.msg(" did disconnect")
174 # then wait until something changes, which ought to be them
177 return current_counter != origin_c.counter
178 return self.poll(_compare)
180 d.addCallback(_disconnect_somebody_else)
182 # and wait for them to reconnect
183 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
185 log.msg("doing _check2")
187 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
188 d.addCallback(_check2)
190 def _disconnect_yourself(res):
191 # now disconnect somebody's connection to themselves.
192 current_counter = origin_c.counter
193 victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
194 log.msg(" disconnecting %s->%s" %
195 (tubs[origin_c].tubID,
196 idlib.shortnodeid_b2a(victim_nodeid)))
197 origin_c.debug_disconnect_from_peerid(victim_nodeid)
198 log.msg(" did disconnect from self")
201 return current_counter != origin_c.counter
202 return self.poll(_compare)
203 d.addCallback(_disconnect_yourself)
205 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
207 log.msg("doing _check3")
209 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
211 d.addCallback(_check3)
212 def _shutdown_introducer(res):
213 # now shut down the introducer. We do this by shutting down the
214 # tub it's using. Nobody's connections (to each other) should go
215 # down. All clients should notice the loss, and no other errors
217 log.msg("shutting down the introducer")
218 return self.central_tub.disownServiceParent()
219 d.addCallback(_shutdown_introducer)
220 def _wait_for_introducer_loss():
222 if c.connected_to_introducer():
225 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
228 log.msg("doing _check4")
230 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
232 self.failIf(c.connected_to_introducer())
233 d.addCallback(_check4)
236 class TooNewServer(IntroducerService):
237 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
239 "application-version": "greetings from the crazy future",
242 class NonV1Server(SystemTestMixin, unittest.TestCase):
243 # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
244 # protocol, it is supposed to provide a useful error instead of a weird
247 def test_failure(self):
249 i.setServiceParent(self.parent)
250 self.introducer_furl = self.central_tub.registerReference(i)
253 tub.setServiceParent(self.parent)
254 l = tub.listenOn("tcp:0")
255 portnum = l.getPortnum()
256 tub.setLocation("localhost:%d" % portnum)
259 c = IntroducerClient(tub, self.introducer_furl,
260 "nickname-client", "version", "oldest")
261 c.subscribe_to("storage")
263 c.setServiceParent(self.parent)
265 # now we wait for it to connect and notice the bad version
268 return bool(c._introducer_error) or bool(c._publisher)
269 d = self.poll(_got_bad)
271 self.failUnless(c._introducer_error)
272 self.failUnless(c._introducer_error.check(InsufficientVersionError))
276 class Index(unittest.TestCase):
277 def test_make_index(self):
278 # make sure we have a working base64.b32decode. The one in
279 # python2.4.[01] was broken.
280 ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i',
281 'storage', 'RIStorageServer.tahoe.allmydata.com',
282 'plancha', 'allmydata-tahoe/1.4.1', '1.0.0')
283 (nodeid, service_name) = make_index(ann)
284 self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
285 self.failUnlessEqual(service_name, "storage")