1 from base64 import b32decode
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient
14 from allmydata.introducer.server import IntroducerService
15 # test compatibility with old introducer .tac files
16 from allmydata.introducer import IntroducerNode
17 from allmydata.introducer import old
18 from allmydata.util import idlib, pollmixin
19 import common_util as testutil
21 class FakeNode(Referenceable):
24 class LoggingMultiService(service.MultiService):
25 def log(self, msg, **kw):
28 class Node(testutil.SignalMixin, unittest.TestCase):
29 def test_loadable(self):
30 basedir = "introducer.IntroducerNode.test_loadable"
32 q = IntroducerNode(basedir)
33 d = fireEventually(None)
34 d.addCallback(lambda res: q.startService())
35 d.addCallback(lambda res: q.when_tub_ready())
36 d.addCallback(lambda res: q.stopService())
37 d.addCallback(flushEventualQueue)
42 self.parent = LoggingMultiService()
43 self.parent.startService()
45 log.msg("TestIntroducer.tearDown")
46 d = defer.succeed(None)
47 d.addCallback(lambda res: self.parent.stopService())
48 d.addCallback(flushEventualQueue)
51 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
53 def test_create(self):
54 ic = IntroducerClient(None, "introducer.furl", "my_nickname",
55 "my_version", "oldest_version")
57 def test_listen(self):
58 i = IntroducerService()
59 i.setServiceParent(self.parent)
61 def test_duplicate(self):
62 i = IntroducerService()
63 self.failUnlessEqual(len(i.get_announcements()), 0)
64 self.failUnlessEqual(len(i.get_subscribers()), 0)
65 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
66 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
67 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
68 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
69 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
70 i.remote_publish(ann1)
71 self.failUnlessEqual(len(i.get_announcements()), 1)
72 self.failUnlessEqual(len(i.get_subscribers()), 0)
73 i.remote_publish(ann2)
74 self.failUnlessEqual(len(i.get_announcements()), 2)
75 self.failUnlessEqual(len(i.get_subscribers()), 0)
76 i.remote_publish(ann1b)
77 self.failUnlessEqual(len(i.get_announcements()), 2)
78 self.failUnlessEqual(len(i.get_subscribers()), 0)
80 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
83 ServiceMixin.setUp(self)
84 self.central_tub = tub = Tub()
85 #tub.setOption("logLocalFailures", True)
86 #tub.setOption("logRemoteFailures", True)
87 tub.setServiceParent(self.parent)
88 l = tub.listenOn("tcp:0")
89 portnum = l.getPortnum()
90 tub.setLocation("localhost:%d" % portnum)
92 class SystemTest(SystemTestMixin, unittest.TestCase):
94 def test_system(self):
95 i = IntroducerService()
96 i.setServiceParent(self.parent)
97 self.introducer_furl = self.central_tub.registerReference(i)
98 return self.do_system_test()
100 def test_system_oldserver(self):
101 i = old.IntroducerService_V1()
102 i.setServiceParent(self.parent)
103 self.introducer_furl = self.central_tub.registerReference(i)
104 return self.do_system_test()
106 def do_system_test(self):
109 # we have 5 clients who publish themselves, and an extra one does
110 # which not. When the connections are fully established, all six nodes
111 # should have 5 connections each.
115 for i in range(NUMCLIENTS+1):
117 #tub.setOption("logLocalFailures", True)
118 #tub.setOption("logRemoteFailures", True)
119 tub.setServiceParent(self.parent)
120 l = tub.listenOn("tcp:0")
121 portnum = l.getPortnum()
122 tub.setLocation("localhost:%d" % portnum)
125 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
126 client_class = IntroducerClient
128 client_class = old.IntroducerClient_V1
129 c = client_class(tub, self.introducer_furl,
130 "nickname-%d" % i, "version", "oldest")
132 node_furl = tub.registerReference(n)
133 c.publish(node_furl, "storage", "ri_name")
134 # the last one does not publish anything
136 c.subscribe_to("storage")
138 c.setServiceParent(self.parent)
142 def _wait_for_all_connections():
144 if len(c.get_all_connections()) < NUMCLIENTS:
147 d = self.poll(_wait_for_all_connections)
150 log.msg("doing _check1")
152 self.failUnless(c.connected_to_introducer())
153 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
154 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
155 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
157 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
158 self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
160 d.addCallback(_check1)
162 origin_c = clients[0]
163 def _disconnect_somebody_else(res):
164 # now disconnect somebody's connection to someone else
165 current_counter = origin_c.counter
166 victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
167 log.msg(" disconnecting %s->%s" %
168 (tubs[origin_c].tubID,
169 idlib.shortnodeid_b2a(victim_nodeid)))
170 origin_c.debug_disconnect_from_peerid(victim_nodeid)
171 log.msg(" did disconnect")
173 # then wait until something changes, which ought to be them
176 return current_counter != origin_c.counter
177 return self.poll(_compare)
179 d.addCallback(_disconnect_somebody_else)
181 # and wait for them to reconnect
182 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
184 log.msg("doing _check2")
186 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
187 d.addCallback(_check2)
189 def _disconnect_yourself(res):
190 # now disconnect somebody's connection to themselves.
191 current_counter = origin_c.counter
192 victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
193 log.msg(" disconnecting %s->%s" %
194 (tubs[origin_c].tubID,
195 idlib.shortnodeid_b2a(victim_nodeid)))
196 origin_c.debug_disconnect_from_peerid(victim_nodeid)
197 log.msg(" did disconnect from self")
200 return current_counter != origin_c.counter
201 return self.poll(_compare)
202 d.addCallback(_disconnect_yourself)
204 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
206 log.msg("doing _check3")
208 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
210 d.addCallback(_check3)
211 def _shutdown_introducer(res):
212 # now shut down the introducer. We do this by shutting down the
213 # tub it's using. Nobody's connections (to each other) should go
214 # down. All clients should notice the loss, and no other errors
216 log.msg("shutting down the introducer")
217 return self.central_tub.disownServiceParent()
218 d.addCallback(_shutdown_introducer)
219 def _wait_for_introducer_loss():
221 if c.connected_to_introducer():
224 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
227 log.msg("doing _check4")
229 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
231 self.failIf(c.connected_to_introducer())
232 d.addCallback(_check4)
235 class TooNewServer(IntroducerService):
236 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
238 "application-version": "greetings from the crazy future",
241 class NonV1Server(SystemTestMixin, unittest.TestCase):
242 # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
243 # protocol, it is supposed to provide a useful error instead of a weird
246 def test_failure(self):
248 i.setServiceParent(self.parent)
249 self.introducer_furl = self.central_tub.registerReference(i)
252 tub.setServiceParent(self.parent)
253 l = tub.listenOn("tcp:0")
254 portnum = l.getPortnum()
255 tub.setLocation("localhost:%d" % portnum)
258 c = IntroducerClient(tub, self.introducer_furl,
259 "nickname-client", "version", "oldest")
260 c.subscribe_to("storage")
262 c.setServiceParent(self.parent)
264 # now we wait for it to connect and notice the bad version
267 return bool(c._introducer_error) or bool(c._publisher)
268 d = self.poll(_got_bad)
270 self.failUnless(c._introducer_error)
271 self.failUnless(c._introducer_error.check(InsufficientVersionError))