1 from base64 import b32decode
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
9 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
10 from twisted.application import service
11 from allmydata.interfaces import InsufficientVersionError
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 # test compatibility with old introducer .tac files
15 from allmydata.introducer import IntroducerNode
16 from allmydata.introducer import old
17 from allmydata.util import pollmixin
18 import common_util as testutil
20 class LoggingMultiService(service.MultiService):
21 def log(self, msg, **kw):
24 class Node(testutil.SignalMixin, unittest.TestCase):
25 def test_loadable(self):
26 basedir = "introducer.IntroducerNode.test_loadable"
28 q = IntroducerNode(basedir)
29 d = fireEventually(None)
30 d.addCallback(lambda res: q.startService())
31 d.addCallback(lambda res: q.when_tub_ready())
32 d.addCallback(lambda res: q.stopService())
33 d.addCallback(flushEventualQueue)
38 self.parent = LoggingMultiService()
39 self.parent.startService()
41 log.msg("TestIntroducer.tearDown")
42 d = defer.succeed(None)
43 d.addCallback(lambda res: self.parent.stopService())
44 d.addCallback(flushEventualQueue)
47 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
49 def test_create(self):
50 ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
51 "my_version", "oldest_version")
53 def test_listen(self):
54 i = IntroducerService()
55 i.setServiceParent(self.parent)
57 def test_duplicate(self):
58 i = IntroducerService()
59 self.failUnlessEqual(len(i.get_announcements()), 0)
60 self.failUnlessEqual(len(i.get_subscribers()), 0)
61 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
62 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
63 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
64 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
65 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
66 i.remote_publish(ann1)
67 self.failUnlessEqual(len(i.get_announcements()), 1)
68 self.failUnlessEqual(len(i.get_subscribers()), 0)
69 i.remote_publish(ann2)
70 self.failUnlessEqual(len(i.get_announcements()), 2)
71 self.failUnlessEqual(len(i.get_subscribers()), 0)
72 i.remote_publish(ann1b)
73 self.failUnlessEqual(len(i.get_announcements()), 2)
74 self.failUnlessEqual(len(i.get_subscribers()), 0)
76 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
78 def create_tub(self, portnum=0):
79 tubfile = os.path.join(self.basedir, "tub.pem")
80 self.central_tub = tub = Tub(certFile=tubfile)
81 #tub.setOption("logLocalFailures", True)
82 #tub.setOption("logRemoteFailures", True)
83 tub.setOption("expose-remote-exception-types", False)
84 tub.setServiceParent(self.parent)
85 l = tub.listenOn("tcp:%d" % portnum)
86 self.central_portnum = l.getPortnum()
88 assert self.central_portnum == portnum
89 tub.setLocation("localhost:%d" % self.central_portnum)
91 class SystemTest(SystemTestMixin, unittest.TestCase):
93 def test_system(self):
94 self.basedir = "introducer/SystemTest/system"
95 os.makedirs(self.basedir)
96 return self.do_system_test(IntroducerService)
97 test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
99 def do_system_test(self, create_introducer):
101 introducer = create_introducer()
102 introducer.setServiceParent(self.parent)
103 iff = os.path.join(self.basedir, "introducer.furl")
104 tub = self.central_tub
105 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
106 self.introducer_furl = ifurl
109 # we have 5 clients who publish themselves, and an extra one does
110 # which not. When the connections are fully established, all six nodes
111 # should have 5 connections each.
115 received_announcements = {}
116 NUM_SERVERS = NUMCLIENTS
117 subscribing_clients = []
118 publishing_clients = []
120 for i in range(NUMCLIENTS+1):
122 #tub.setOption("logLocalFailures", True)
123 #tub.setOption("logRemoteFailures", True)
124 tub.setOption("expose-remote-exception-types", False)
125 tub.setServiceParent(self.parent)
126 l = tub.listenOn("tcp:0")
127 portnum = l.getPortnum()
128 tub.setLocation("localhost:%d" % portnum)
130 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
131 c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i,
133 received_announcements[c] = ra = {}
134 def got(serverid, ann_d, announcements):
135 announcements[serverid] = ann_d
136 c.subscribe_to("storage", got, received_announcements[c])
137 subscribing_clients.append(c)
140 node_furl = tub.registerReference(Referenceable())
141 c.publish(node_furl, "storage", "ri_name")
142 publishing_clients.append(c)
143 # the last one does not publish anything
145 c.setServiceParent(self.parent)
149 def _wait_for_all_connections():
150 for c in subscribing_clients:
151 if len(received_announcements[c]) < NUM_SERVERS:
154 d = self.poll(_wait_for_all_connections)
157 log.msg("doing _check1")
158 dc = introducer._debug_counts
159 self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS)
160 self.failUnlessEqual(dc["inbound_duplicate"], 0)
161 self.failUnlessEqual(dc["inbound_update"], 0)
162 self.failUnless(dc["outbound_message"])
165 self.failUnless(c.connected_to_introducer())
166 for c in subscribing_clients:
167 cdc = c._debug_counts
168 self.failUnless(cdc["inbound_message"])
169 self.failUnlessEqual(cdc["inbound_announcement"],
171 self.failUnlessEqual(cdc["wrong_service"], 0)
172 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
173 self.failUnlessEqual(cdc["update"], 0)
174 self.failUnlessEqual(cdc["new_announcement"],
176 anns = received_announcements[c]
177 self.failUnlessEqual(len(anns), NUM_SERVERS)
179 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
180 ann_d = anns[nodeid0]
181 nick = ann_d["nickname"]
182 self.failUnlessEqual(type(nick), unicode)
183 self.failUnlessEqual(nick, u"nickname-0")
184 for c in publishing_clients:
185 cdc = c._debug_counts
186 self.failUnlessEqual(cdc["outbound_message"], 1)
187 d.addCallback(_check1)
189 # force an introducer reconnect, by shutting down the Tub it's using
190 # and starting a new Tub (with the old introducer). Everybody should
191 # reconnect and republish, but the introducer should ignore the
192 # republishes as duplicates. However, because the server doesn't know
193 # what each client does and does not know, it will send them a copy
194 # of the current announcement table anyway.
196 d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
197 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
199 def _wait_for_introducer_loss():
201 if c.connected_to_introducer():
204 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
206 def _restart_introducer_tub(_ign):
207 log.msg("restarting introducer's Tub")
209 # note: old.Server doesn't have this count
210 dc = introducer._debug_counts
211 self.expected_count = dc["inbound_message"] + NUM_SERVERS
212 self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1
213 introducer._debug0 = dc["outbound_message"]
214 for c in subscribing_clients:
215 cdc = c._debug_counts
216 c._debug0 = cdc["inbound_message"]
218 self.create_tub(self.central_portnum)
219 newfurl = self.central_tub.registerReference(introducer,
221 assert newfurl == self.introducer_furl
222 d.addCallback(_restart_introducer_tub)
224 def _wait_for_introducer_reconnect():
226 # all clients are connected
227 # the introducer has received publish messages from all of them
228 # the introducer has received subscribe messages from all of them
229 # the introducer has sent (duplicate) announcements to all of them
230 # all clients have received (duplicate) announcements
231 dc = introducer._debug_counts
233 if not c.connected_to_introducer():
235 if dc["inbound_message"] < self.expected_count:
237 if dc["inbound_subscribe"] < self.expected_subscribe_count:
239 for c in subscribing_clients:
240 cdc = c._debug_counts
241 if cdc["inbound_message"] < c._debug0+1:
244 d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
247 log.msg("doing _check2")
248 # assert that the introducer sent out new messages, one per
250 dc = introducer._debug_counts
251 self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS)
252 self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS)
253 self.failUnlessEqual(dc["inbound_update"], 0)
254 self.failUnlessEqual(dc["outbound_message"],
255 introducer._debug0 + len(subscribing_clients))
257 self.failUnless(c.connected_to_introducer())
258 for c in subscribing_clients:
259 cdc = c._debug_counts
260 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS)
261 d.addCallback(_check2)
263 # Then force an introducer restart, by shutting down the Tub,
264 # destroying the old introducer, and starting a new Tub+Introducer.
265 # Everybody should reconnect and republish, and the (new) introducer
266 # will distribute the new announcements, but the clients should
267 # ignore the republishes as duplicates.
269 d.addCallback(lambda _ign: log.msg("shutting down introducer"))
270 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
271 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
273 def _restart_introducer(_ign):
274 log.msg("restarting introducer")
275 self.create_tub(self.central_portnum)
277 for c in subscribing_clients:
278 # record some counters for later comparison. Stash the values
279 # on the client itself, because I'm lazy.
280 cdc = c._debug_counts
281 c._debug1 = cdc["inbound_announcement"]
282 c._debug2 = cdc["inbound_message"]
283 c._debug3 = cdc["new_announcement"]
284 newintroducer = create_introducer()
285 self.expected_message_count = NUM_SERVERS
286 self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients)
287 self.expected_subscribe_count = len(subscribing_clients)
288 newfurl = self.central_tub.registerReference(newintroducer,
290 assert newfurl == self.introducer_furl
291 d.addCallback(_restart_introducer)
292 def _wait_for_introducer_reconnect2():
294 # all clients are connected
295 # the introducer has received publish messages from all of them
296 # the introducer has received subscribe messages from all of them
297 # the introducer has sent announcements for everybody to everybody
298 # all clients have received all the (duplicate) announcements
299 # at that point, the system should be quiescent
300 dc = introducer._debug_counts
302 if not c.connected_to_introducer():
304 if dc["inbound_message"] < self.expected_message_count:
306 if dc["outbound_announcements"] < self.expected_announcement_count:
308 if dc["inbound_subscribe"] < self.expected_subscribe_count:
310 for c in subscribing_clients:
311 cdc = c._debug_counts
312 if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
315 d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
318 log.msg("doing _check3")
320 self.failUnless(c.connected_to_introducer())
321 for c in subscribing_clients:
322 cdc = c._debug_counts
323 self.failUnless(cdc["inbound_announcement"] > c._debug1)
324 self.failUnless(cdc["inbound_message"] > c._debug2)
325 # there should have been no new announcements
326 self.failUnlessEqual(cdc["new_announcement"], c._debug3)
327 # and the right number of duplicate ones. There were
328 # NUM_SERVERS from the servertub restart, and there should be
329 # another NUM_SERVERS now
330 self.failUnlessEqual(cdc["duplicate_announcement"],
333 d.addCallback(_check3)
336 class TooNewServer(IntroducerService):
337 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
339 "application-version": "greetings from the crazy future",
342 class NonV1Server(SystemTestMixin, unittest.TestCase):
343 # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
344 # protocol, it is supposed to provide a useful error instead of a weird
347 def test_failure(self):
348 self.basedir = "introducer/NonV1Server/failure"
349 os.makedirs(self.basedir)
352 i.setServiceParent(self.parent)
353 self.introducer_furl = self.central_tub.registerReference(i)
356 tub.setOption("expose-remote-exception-types", False)
357 tub.setServiceParent(self.parent)
358 l = tub.listenOn("tcp:0")
359 portnum = l.getPortnum()
360 tub.setLocation("localhost:%d" % portnum)
362 c = IntroducerClient(tub, self.introducer_furl,
363 u"nickname-client", "version", "oldest")
365 def got(serverid, ann_d):
366 announcements[serverid] = ann_d
367 c.subscribe_to("storage", got)
369 c.setServiceParent(self.parent)
371 # now we wait for it to connect and notice the bad version
374 return bool(c._introducer_error) or bool(c._publisher)
375 d = self.poll(_got_bad)
377 self.failUnless(c._introducer_error)
378 self.failUnless(c._introducer_error.check(InsufficientVersionError))
382 class Index(unittest.TestCase):
383 def test_make_index(self):
384 # make sure we have a working base64.b32decode. The one in
385 # python2.4.[01] was broken.
386 ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i',
387 'storage', 'RIStorageServer.tahoe.allmydata.com',
388 'plancha', 'allmydata-tahoe/1.4.1', '1.0.0')
389 (nodeid, service_name) = old.make_index(ann)
390 self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
391 self.failUnlessEqual(service_name, "storage")