3 from base64 import b32decode
6 from twisted.trial import unittest
7 from twisted.internet import defer, address
8 from twisted.python import log
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14 WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17 get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.web import introweb
23 from allmydata.util import pollmixin, keyutil, idlib
24 import allmydata.test.common_util as testutil
26 class LoggingMultiService(service.MultiService):
27 def log(self, msg, **kw):
30 class Node(testutil.SignalMixin, unittest.TestCase):
31 def test_loadable(self):
32 basedir = "introducer.IntroducerNode.test_loadable"
34 q = IntroducerNode(basedir)
35 d = fireEventually(None)
36 d.addCallback(lambda res: q.startService())
37 d.addCallback(lambda res: q.when_tub_ready())
38 d.addCallback(lambda res: q.stopService())
39 d.addCallback(flushEventualQueue)
44 self.parent = LoggingMultiService()
45 self.parent.startService()
47 log.msg("TestIntroducer.tearDown")
48 d = defer.succeed(None)
49 d.addCallback(lambda res: self.parent.stopService())
50 d.addCallback(flushEventualQueue)
53 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
55 def test_create(self):
56 ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
57 "my_version", "oldest_version", {})
58 self.failUnless(isinstance(ic, IntroducerClient))
60 def test_listen(self):
61 i = IntroducerService()
62 i.setServiceParent(self.parent)
64 def test_duplicate_publish(self):
65 i = IntroducerService()
66 self.failUnlessEqual(len(i.get_announcements()), 0)
67 self.failUnlessEqual(len(i.get_subscribers()), 0)
68 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
69 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
70 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
71 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
72 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
73 i.remote_publish(ann1)
74 self.failUnlessEqual(len(i.get_announcements()), 1)
75 self.failUnlessEqual(len(i.get_subscribers()), 0)
76 i.remote_publish(ann2)
77 self.failUnlessEqual(len(i.get_announcements()), 2)
78 self.failUnlessEqual(len(i.get_subscribers()), 0)
79 i.remote_publish(ann1b)
80 self.failUnlessEqual(len(i.get_announcements()), 2)
81 self.failUnlessEqual(len(i.get_subscribers()), 0)
83 def test_id_collision(self):
84 # test replacement case where tubid equals a keyid (one should
85 # not replace the other)
86 i = IntroducerService()
87 ic = IntroducerClient(None,
88 "introducer.furl", u"my_nickname",
89 "my_version", "oldest_version", {})
90 sk_s, vk_s = keyutil.make_keypair()
91 sk, _ignored = keyutil.parse_privkey(sk_s)
92 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
93 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
94 ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
95 i.remote_publish_v2(ann_t, Referenceable())
96 announcements = i.get_announcements()
97 self.failUnlessEqual(len(announcements), 1)
98 key1 = ("storage", "v0-"+keyid, None)
99 self.failUnlessEqual(announcements[0].index, key1)
100 ann1_out = announcements[0].announcement
101 self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
103 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
104 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
105 i.remote_publish(ann2)
106 announcements = i.get_announcements()
107 self.failUnlessEqual(len(announcements), 2)
108 key2 = ("storage", None, keyid)
109 wanted = [ad for ad in announcements if ad.index == key2]
110 self.failUnlessEqual(len(wanted), 1)
111 ann2_out = wanted[0].announcement
112 self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
116 ann = { "anonymous-storage-FURL": furl,
117 "permutation-seed-base32": get_tubid_string(furl) }
120 def make_ann_t(ic, furl, privkey):
121 return ic.create_announcement("storage", make_ann(furl), privkey)
123 class Client(unittest.TestCase):
124 def test_duplicate_receive_v1(self):
125 ic = IntroducerClient(None,
126 "introducer.furl", u"my_nickname",
127 "my_version", "oldest_version", {})
129 ic.subscribe_to("storage",
130 lambda key_s,ann: announcements.append(ann))
131 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
132 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
133 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
134 ca = WrapV2ClientInV1Interface(ic)
136 ca.remote_announce([ann1])
139 self.failUnlessEqual(len(announcements), 1)
140 self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
141 self.failUnlessEqual(announcements[0]["my-version"], "ver23")
142 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
143 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
144 self.failUnlessEqual(ic._debug_counts["update"], 0)
145 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
146 # now send a duplicate announcement: this should not notify clients
147 ca.remote_announce([ann1])
148 return fireEventually()
151 self.failUnlessEqual(len(announcements), 1)
152 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
153 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
154 self.failUnlessEqual(ic._debug_counts["update"], 0)
155 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
156 # and a replacement announcement: same FURL, new other stuff.
157 # Clients should be notified.
158 ca.remote_announce([ann1b])
159 return fireEventually()
160 d.addCallback(_then2)
162 self.failUnlessEqual(len(announcements), 2)
163 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
164 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
165 self.failUnlessEqual(ic._debug_counts["update"], 1)
166 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
167 # test that the other stuff changed
168 self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
169 self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
170 d.addCallback(_then3)
173 def test_duplicate_receive_v2(self):
174 ic1 = IntroducerClient(None,
175 "introducer.furl", u"my_nickname",
176 "ver23", "oldest_version", {})
177 # we use a second client just to create a different-looking
179 ic2 = IntroducerClient(None,
180 "introducer.furl", u"my_nickname",
181 "ver24","oldest_version",{})
183 def _received(key_s, ann):
184 announcements.append( (key_s, ann) )
185 ic1.subscribe_to("storage", _received)
186 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
187 furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
188 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
190 privkey_s, pubkey_vs = keyutil.make_keypair()
191 privkey, _ignored = keyutil.parse_privkey(privkey_s)
192 pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
195 # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
199 self.ann1 = make_ann_t(ic1, furl1, privkey)
200 self.ann1a = make_ann_t(ic1, furl1a, privkey)
201 self.ann1b = make_ann_t(ic2, furl1, privkey)
202 self.ann2 = make_ann_t(ic2, furl2, privkey)
204 ic1.remote_announce_v2([self.ann1]) # queues eventual-send
207 self.failUnlessEqual(len(announcements), 1)
208 key_s,ann = announcements[0]
209 self.failUnlessEqual(key_s, pubkey_s)
210 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
211 self.failUnlessEqual(ann["my-version"], "ver23")
212 d.addCallback(_then1)
214 # now send a duplicate announcement. This should not fire the
216 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
217 d.addCallback(fireEventually)
219 self.failUnlessEqual(len(announcements), 1)
220 d.addCallback(_then2)
222 # and a replacement announcement: same FURL, new other stuff. The
223 # subscriber *should* be fired.
224 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
225 d.addCallback(fireEventually)
227 self.failUnlessEqual(len(announcements), 2)
228 key_s,ann = announcements[-1]
229 self.failUnlessEqual(key_s, pubkey_s)
230 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
231 self.failUnlessEqual(ann["my-version"], "ver24")
232 d.addCallback(_then3)
234 # and a replacement announcement with a different FURL (it uses
235 # different connection hints)
236 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
237 d.addCallback(fireEventually)
239 self.failUnlessEqual(len(announcements), 3)
240 key_s,ann = announcements[-1]
241 self.failUnlessEqual(key_s, pubkey_s)
242 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
243 self.failUnlessEqual(ann["my-version"], "ver23")
244 d.addCallback(_then4)
246 # now add a new subscription, which should be called with the
247 # backlog. The introducer only records one announcement per index, so
248 # the backlog will only have the latest message.
250 def _received2(key_s, ann):
251 announcements2.append( (key_s, ann) )
252 d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
253 d.addCallback(fireEventually)
255 self.failUnlessEqual(len(announcements2), 1)
256 key_s,ann = announcements2[-1]
257 self.failUnlessEqual(key_s, pubkey_s)
258 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
259 self.failUnlessEqual(ann["my-version"], "ver23")
260 d.addCallback(_then5)
263 def test_id_collision(self):
264 # test replacement case where tubid equals a keyid (one should
265 # not replace the other)
266 ic = IntroducerClient(None,
267 "introducer.furl", u"my_nickname",
268 "my_version", "oldest_version", {})
270 ic.subscribe_to("storage",
271 lambda key_s,ann: announcements.append(ann))
272 sk_s, vk_s = keyutil.make_keypair()
273 sk, _ignored = keyutil.parse_privkey(sk_s)
274 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
275 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
276 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
277 ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
278 ic.remote_announce_v2([ann_t])
281 # first announcement has been processed
282 self.failUnlessEqual(len(announcements), 1)
283 self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
285 # now submit a second one, with a tubid that happens to look just
286 # like the pubkey-based serverid we just processed. They should
288 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
289 ca = WrapV2ClientInV1Interface(ic)
290 ca.remote_announce([ann2])
291 return fireEventually()
294 # if they overlapped, the second announcement would be ignored
295 self.failUnlessEqual(len(announcements), 2)
296 self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
298 d.addCallback(_then2)
301 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
303 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
305 def create_tub(self, portnum=0):
306 tubfile = os.path.join(self.basedir, "tub.pem")
307 self.central_tub = tub = Tub(certFile=tubfile)
308 #tub.setOption("logLocalFailures", True)
309 #tub.setOption("logRemoteFailures", True)
310 tub.setOption("expose-remote-exception-types", False)
311 tub.setServiceParent(self.parent)
312 l = tub.listenOn("tcp:%d" % portnum)
313 self.central_portnum = l.getPortnum()
315 assert self.central_portnum == portnum
316 tub.setLocation("localhost:%d" % self.central_portnum)
318 class Queue(SystemTestMixin, unittest.TestCase):
319 def test_queue_until_connected(self):
320 self.basedir = "introducer/QueueUntilConnected/queued"
321 os.makedirs(self.basedir)
323 introducer = IntroducerService()
324 introducer.setServiceParent(self.parent)
325 iff = os.path.join(self.basedir, "introducer.furl")
326 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
328 tub2.setServiceParent(self.parent)
329 c = IntroducerClient(tub2, ifurl,
330 u"nickname", "version", "oldest", {})
331 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
332 sk_s, vk_s = keyutil.make_keypair()
333 sk, _ignored = keyutil.parse_privkey(sk_s)
335 d = introducer.disownServiceParent()
337 # now that the introducer server is offline, create a client and
338 # publish some messages
339 c.setServiceParent(self.parent) # this starts the reconnector
340 c.publish("storage", make_ann(furl1), sk)
342 introducer.setServiceParent(self.parent) # restart the server
343 # now wait for the messages to be delivered
344 def _got_announcement():
345 return bool(introducer.get_announcements())
346 return self.poll(_got_announcement)
347 d.addCallback(_offline)
349 v = introducer.get_announcements()[0]
350 furl = v.announcement["anonymous-storage-FURL"]
351 self.failUnlessEqual(furl, furl1)
354 # now let the ack get back
355 def _wait_until_idle(ign):
357 if c._debug_outstanding:
359 if introducer._debug_outstanding:
362 return self.poll(_idle)
363 d.addCallback(_wait_until_idle)
368 class SystemTest(SystemTestMixin, unittest.TestCase):
370 def do_system_test(self, server_version):
372 if server_version == V1:
373 introducer = old.IntroducerService_v1()
375 introducer = IntroducerService()
376 introducer.setServiceParent(self.parent)
377 iff = os.path.join(self.basedir, "introducer.furl")
378 tub = self.central_tub
379 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
380 self.introducer_furl = ifurl
382 # we have 5 clients who publish themselves as storage servers, and a
383 # sixth which does which not. All 6 clients subscriber to hear about
384 # storage. When the connections are fully established, all six nodes
385 # should have 5 connections each.
391 received_announcements = {}
392 subscribing_clients = []
393 publishing_clients = []
394 printable_serverids = {}
395 self.the_introducer = introducer
397 expected_announcements = [0 for c in range(NUM_CLIENTS)]
399 for i in range(NUM_CLIENTS):
401 #tub.setOption("logLocalFailures", True)
402 #tub.setOption("logRemoteFailures", True)
403 tub.setOption("expose-remote-exception-types", False)
404 tub.setServiceParent(self.parent)
405 l = tub.listenOn("tcp:0")
406 portnum = l.getPortnum()
407 tub.setLocation("localhost:%d" % portnum)
409 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
411 c = old.IntroducerClient_v1(tub, self.introducer_furl,
415 c = IntroducerClient(tub, self.introducer_furl,
418 {"component": "component-v1"})
419 received_announcements[c] = {}
420 def got(key_s_or_tubid, ann, announcements, i):
422 index = get_tubid_string_from_ann(ann)
424 index = key_s_or_tubid or get_tubid_string_from_ann(ann)
425 announcements[index] = ann
426 c.subscribe_to("storage", got, received_announcements[c], i)
427 subscribing_clients.append(c)
428 expected_announcements[i] += 1 # all expect a 'storage' announcement
430 node_furl = tub.registerReference(Referenceable())
433 c.publish(node_furl, "storage", "ri_name")
434 printable_serverids[i] = get_tubid_string(node_furl)
436 # sign the announcement
437 privkey_s, pubkey_s = keyutil.make_keypair()
438 privkey, _ignored = keyutil.parse_privkey(privkey_s)
439 privkeys[c] = privkey
440 c.publish("storage", make_ann(node_furl), privkey)
441 if server_version == V1:
442 printable_serverids[i] = get_tubid_string(node_furl)
444 assert pubkey_s.startswith("pub-")
445 printable_serverids[i] = pubkey_s[len("pub-"):]
447 c.publish("storage", make_ann(node_furl))
448 printable_serverids[i] = get_tubid_string(node_furl)
449 publishing_clients.append(c)
451 # the last one does not publish anything
455 # users of the V1 client were required to publish a
456 # 'stub_client' record (somewhat after they published the
457 # 'storage' record), so the introducer could see their
458 # version. Match that behavior.
459 c.publish(node_furl, "stub_client", "stub_ri_name")
462 # also publish something that nobody cares about
463 boring_furl = tub.registerReference(Referenceable())
464 c.publish("boring", make_ann(boring_furl))
466 c.setServiceParent(self.parent)
471 def _wait_for_connected(ign):
474 if not c.connected_to_introducer():
477 return self.poll(_connected)
479 # we watch the clients to determine when the system has settled down.
480 # Then we can look inside the server to assert things about its
483 def _wait_for_expected_announcements(ign):
484 def _got_expected_announcements():
485 for i,c in enumerate(subscribing_clients):
486 if len(received_announcements[c]) < expected_announcements[i]:
489 return self.poll(_got_expected_announcements)
491 # before shutting down any Tub, we'd like to know that there are no
492 # messages outstanding
494 def _wait_until_idle(ign):
496 for c in subscribing_clients + publishing_clients:
497 if c._debug_outstanding:
499 if self.the_introducer._debug_outstanding:
502 return self.poll(_idle)
504 d = defer.succeed(None)
505 d.addCallback(_wait_for_connected)
506 d.addCallback(_wait_for_expected_announcements)
507 d.addCallback(_wait_until_idle)
510 log.msg("doing _check1")
511 dc = self.the_introducer._debug_counts
512 if server_version == V1:
513 # each storage server publishes a record, and (after its
514 # 'subscribe' has been ACKed) also publishes a "stub_client".
515 # The non-storage client (which subscribes) also publishes a
516 # stub_client. There is also one "boring" service. The number
517 # of messages is higher, because the stub_clients aren't
518 # published until after we get the 'subscribe' ack (since we
519 # don't realize that we're dealing with a v1 server [which
520 # needs stub_clients] until then), and the act of publishing
521 # the stub_client causes us to re-send all previous
523 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
524 NUM_STORAGE + NUM_CLIENTS + 1)
526 # each storage server publishes a record. There is also one
527 # "stub_client" and one "boring"
528 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
529 self.failUnlessEqual(dc["inbound_duplicate"], 0)
530 self.failUnlessEqual(dc["inbound_update"], 0)
531 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
532 # the number of outbound messages is tricky.. I think it depends
533 # upon a race between the publish and the subscribe messages.
534 self.failUnless(dc["outbound_message"] > 0)
535 # each client subscribes to "storage", and each server publishes
536 self.failUnlessEqual(dc["outbound_announcements"],
537 NUM_STORAGE*NUM_CLIENTS)
539 for c in subscribing_clients:
540 cdc = c._debug_counts
541 self.failUnless(cdc["inbound_message"])
542 self.failUnlessEqual(cdc["inbound_announcement"],
544 self.failUnlessEqual(cdc["wrong_service"], 0)
545 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
546 self.failUnlessEqual(cdc["update"], 0)
547 self.failUnlessEqual(cdc["new_announcement"],
549 anns = received_announcements[c]
550 self.failUnlessEqual(len(anns), NUM_STORAGE)
552 nodeid0 = tubs[clients[0]].tubID
554 nick = ann["nickname"]
555 self.failUnlessEqual(type(nick), unicode)
556 self.failUnlessEqual(nick, NICKNAME % "0")
557 if server_version == V1:
558 for c in publishing_clients:
559 cdc = c._debug_counts
560 expected = 1 # storage
562 expected += 1 # boring
563 if c is not clients[0]:
564 # the v2 client tries to call publish_v2, which fails
565 # because the server is v1. It then re-sends
566 # everything it has so far, plus a stub_client record
567 expected = 2*expected + 1
569 # we always tell v1 client to send stub_client
571 self.failUnlessEqual(cdc["outbound_message"], expected)
573 for c in publishing_clients:
574 cdc = c._debug_counts
576 if c in [clients[0], # stub_client
580 self.failUnlessEqual(cdc["outbound_message"], expected)
581 # now check the web status, make sure it renders without error
582 ir = introweb.IntroducerRoot(self.parent)
583 self.parent.nodeid = "NODEID"
584 text = ir.renderSynchronously().decode("utf-8")
585 self.failUnlessIn(NICKNAME % "0", text) # the v1 client
586 self.failUnlessIn(NICKNAME % "1", text) # a v2 client
587 for i in range(NUM_STORAGE):
588 self.failUnlessIn(printable_serverids[i], text,
589 (i,printable_serverids[i],text))
590 # make sure there isn't a double-base32ed string too
591 self.failIfIn(idlib.nodeid_b2a(printable_serverids[i]), text,
592 (i,printable_serverids[i],text))
593 log.msg("_check1 done")
594 d.addCallback(_check1)
596 # force an introducer reconnect, by shutting down the Tub it's using
597 # and starting a new Tub (with the old introducer). Everybody should
598 # reconnect and republish, but the introducer should ignore the
599 # republishes as duplicates. However, because the server doesn't know
600 # what each client does and does not know, it will send them a copy
601 # of the current announcement table anyway.
603 d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
604 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
606 def _wait_for_introducer_loss(ign):
607 def _introducer_lost():
609 if c.connected_to_introducer():
612 return self.poll(_introducer_lost)
613 d.addCallback(_wait_for_introducer_loss)
615 def _restart_introducer_tub(_ign):
616 log.msg("restarting introducer's Tub")
618 for i in range(NUM_CLIENTS):
619 c = subscribing_clients[i]
620 for k in c._debug_counts:
621 c._debug_counts[k] = 0
622 for k in self.the_introducer._debug_counts:
623 self.the_introducer._debug_counts[k] = 0
624 expected_announcements[i] += 1 # new 'storage' for everyone
625 self.create_tub(self.central_portnum)
626 newfurl = self.central_tub.registerReference(self.the_introducer,
628 assert newfurl == self.introducer_furl
629 d.addCallback(_restart_introducer_tub)
631 d.addCallback(_wait_for_connected)
632 d.addCallback(_wait_for_expected_announcements)
633 d.addCallback(_wait_until_idle)
634 d.addCallback(lambda _ign: log.msg(" reconnected"))
636 # TODO: publish something while the introducer is offline, then
637 # confirm it gets delivered when the connection is reestablished
639 log.msg("doing _check2")
640 # assert that the introducer sent out new messages, one per
642 dc = self.the_introducer._debug_counts
643 self.failUnlessEqual(dc["outbound_announcements"],
644 NUM_STORAGE*NUM_CLIENTS)
645 self.failUnless(dc["outbound_message"] > 0)
646 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
647 for c in subscribing_clients:
648 cdc = c._debug_counts
649 self.failUnlessEqual(cdc["inbound_message"], 1)
650 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
651 self.failUnlessEqual(cdc["new_announcement"], 0)
652 self.failUnlessEqual(cdc["wrong_service"], 0)
653 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
654 d.addCallback(_check2)
656 # Then force an introducer restart, by shutting down the Tub,
657 # destroying the old introducer, and starting a new Tub+Introducer.
658 # Everybody should reconnect and republish, and the (new) introducer
659 # will distribute the new announcements, but the clients should
660 # ignore the republishes as duplicates.
662 d.addCallback(lambda _ign: log.msg("shutting down introducer"))
663 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
664 d.addCallback(_wait_for_introducer_loss)
665 d.addCallback(lambda _ign: log.msg("introducer lost"))
667 def _restart_introducer(_ign):
668 log.msg("restarting introducer")
669 self.create_tub(self.central_portnum)
671 for i in range(NUM_CLIENTS):
672 c = subscribing_clients[i]
673 for k in c._debug_counts:
674 c._debug_counts[k] = 0
675 expected_announcements[i] += 1 # new 'storage' for everyone
676 if server_version == V1:
677 introducer = old.IntroducerService_v1()
679 introducer = IntroducerService()
680 self.the_introducer = introducer
681 newfurl = self.central_tub.registerReference(self.the_introducer,
683 assert newfurl == self.introducer_furl
684 d.addCallback(_restart_introducer)
686 d.addCallback(_wait_for_connected)
687 d.addCallback(_wait_for_expected_announcements)
688 d.addCallback(_wait_until_idle)
691 log.msg("doing _check3")
692 dc = self.the_introducer._debug_counts
693 self.failUnlessEqual(dc["outbound_announcements"],
694 NUM_STORAGE*NUM_CLIENTS)
695 self.failUnless(dc["outbound_message"] > 0)
696 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
697 for c in subscribing_clients:
698 cdc = c._debug_counts
699 self.failUnless(cdc["inbound_message"] > 0)
700 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
701 self.failUnlessEqual(cdc["new_announcement"], 0)
702 self.failUnlessEqual(cdc["wrong_service"], 0)
703 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
705 d.addCallback(_check3)
709 def test_system_v2_server(self):
710 self.basedir = "introducer/SystemTest/system_v2_server"
711 os.makedirs(self.basedir)
712 return self.do_system_test(V2)
713 test_system_v2_server.timeout = 480
714 # occasionally takes longer than 350s on "draco"
716 def test_system_v1_server(self):
717 self.basedir = "introducer/SystemTest/system_v1_server"
718 os.makedirs(self.basedir)
719 return self.do_system_test(V1)
720 test_system_v1_server.timeout = 480
721 # occasionally takes longer than 350s on "draco"
723 class FakeRemoteReference:
724 def notifyOnDisconnect(self, *args, **kwargs): pass
725 def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
726 def getLocationHints(self): return [("ipv4", "here.example.com", "1234"),
727 ("ipv4", "there.example.com", "2345")]
728 def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
731 class ClientInfo(unittest.TestCase):
732 def test_client_v2(self):
733 introducer = IntroducerService()
734 tub = introducer_furl = None
735 app_versions = {"whizzy": "fizzy"}
736 client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
737 "my_version", "oldest", app_versions)
738 #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
739 #ann_s = make_ann_t(client_v2, furl1, None)
740 #introducer.remote_publish_v2(ann_s, Referenceable())
741 subscriber = FakeRemoteReference()
742 introducer.remote_subscribe_v2(subscriber, "storage",
743 client_v2._my_subscriber_info)
744 subs = introducer.get_subscribers()
745 self.failUnlessEqual(len(subs), 1)
747 self.failUnlessEqual(s0.service_name, "storage")
748 self.failUnlessEqual(s0.app_versions, app_versions)
749 self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
750 self.failUnlessEqual(s0.version, "my_version")
752 def test_client_v1(self):
753 introducer = IntroducerService()
754 subscriber = FakeRemoteReference()
755 introducer.remote_subscribe(subscriber, "storage")
756 # the v1 subscribe interface had no subscriber_info: that was usually
757 # sent in a separate stub_client pseudo-announcement
758 subs = introducer.get_subscribers()
759 self.failUnlessEqual(len(subs), 1)
761 self.failUnlessEqual(s0.nickname, u"?") # not known yet
762 self.failUnlessEqual(s0.service_name, "storage")
764 # now submit the stub_client announcement
765 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
766 ann = (furl1, "stub_client", "RIStubClient",
767 (NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
768 introducer.remote_publish(ann)
769 # the server should correlate the two
770 subs = introducer.get_subscribers()
771 self.failUnlessEqual(len(subs), 1)
773 self.failUnlessEqual(s0.service_name, "storage")
774 # v1 announcements do not contain app-versions
775 self.failUnlessEqual(s0.app_versions, {})
776 self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
777 self.failUnlessEqual(s0.version, "my_version")
779 # a subscription that arrives after the stub_client announcement
780 # should be correlated too
781 subscriber2 = FakeRemoteReference()
782 introducer.remote_subscribe(subscriber2, "thing2")
784 subs = introducer.get_subscribers()
785 self.failUnlessEqual(len(subs), 2)
786 s0 = [s for s in subs if s.service_name == "thing2"][0]
787 # v1 announcements do not contain app-versions
788 self.failUnlessEqual(s0.app_versions, {})
789 self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
790 self.failUnlessEqual(s0.version, "my_version")
792 class Announcements(unittest.TestCase):
793 def test_client_v2_unsigned(self):
794 introducer = IntroducerService()
795 tub = introducer_furl = None
796 app_versions = {"whizzy": "fizzy"}
797 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
798 "my_version", "oldest", app_versions)
799 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
800 tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
801 ann_s0 = make_ann_t(client_v2, furl1, None)
802 canary0 = Referenceable()
803 introducer.remote_publish_v2(ann_s0, canary0)
804 a = introducer.get_announcements()
805 self.failUnlessEqual(len(a), 1)
806 self.failUnlessIdentical(a[0].canary, canary0)
807 self.failUnlessEqual(a[0].index, ("storage", None, tubid))
808 self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
809 self.failUnlessEqual(a[0].nickname, u"nick-v2")
810 self.failUnlessEqual(a[0].service_name, "storage")
811 self.failUnlessEqual(a[0].version, "my_version")
812 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
814 def test_client_v2_signed(self):
815 introducer = IntroducerService()
816 tub = introducer_furl = None
817 app_versions = {"whizzy": "fizzy"}
818 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
819 "my_version", "oldest", app_versions)
820 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
821 sk_s, vk_s = keyutil.make_keypair()
822 sk, _ignored = keyutil.parse_privkey(sk_s)
823 pks = keyutil.remove_prefix(vk_s, "pub-")
824 ann_t0 = make_ann_t(client_v2, furl1, sk)
825 canary0 = Referenceable()
826 introducer.remote_publish_v2(ann_t0, canary0)
827 a = introducer.get_announcements()
828 self.failUnlessEqual(len(a), 1)
829 self.failUnlessIdentical(a[0].canary, canary0)
830 self.failUnlessEqual(a[0].index, ("storage", pks, None))
831 self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
832 self.failUnlessEqual(a[0].nickname, u"nick-v2")
833 self.failUnlessEqual(a[0].service_name, "storage")
834 self.failUnlessEqual(a[0].version, "my_version")
835 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
837 def test_client_v1(self):
838 introducer = IntroducerService()
840 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
841 tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
842 ann = (furl1, "storage", "RIStorage",
843 u"nick-v1".encode("utf-8"), "my_version", "oldest")
844 introducer.remote_publish(ann)
846 a = introducer.get_announcements()
847 self.failUnlessEqual(len(a), 1)
848 self.failUnlessEqual(a[0].index, ("storage", None, tubid))
849 self.failUnlessEqual(a[0].canary, None)
850 self.failUnlessEqual(a[0].announcement["app-versions"], {})
851 self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
852 self.failUnlessEqual(a[0].service_name, "storage")
853 self.failUnlessEqual(a[0].version, "my_version")
854 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
857 class TooNewServer(IntroducerService):
858 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
860 "application-version": "greetings from the crazy future",
863 class NonV1Server(SystemTestMixin, unittest.TestCase):
864 # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
865 # protocol, it is supposed to provide a useful error instead of a weird
868 def test_failure(self):
869 self.basedir = "introducer/NonV1Server/failure"
870 os.makedirs(self.basedir)
873 i.setServiceParent(self.parent)
874 self.introducer_furl = self.central_tub.registerReference(i)
877 tub.setOption("expose-remote-exception-types", False)
878 tub.setServiceParent(self.parent)
879 l = tub.listenOn("tcp:0")
880 portnum = l.getPortnum()
881 tub.setLocation("localhost:%d" % portnum)
883 c = IntroducerClient(tub, self.introducer_furl,
884 u"nickname-client", "version", "oldest", {})
887 announcements[key_s] = ann
888 c.subscribe_to("storage", got)
890 c.setServiceParent(self.parent)
892 # now we wait for it to connect and notice the bad version
895 return bool(c._introducer_error) or bool(c._publisher)
896 d = self.poll(_got_bad)
898 self.failUnless(c._introducer_error)
899 self.failUnless(c._introducer_error.check(InsufficientVersionError),
904 class DecodeFurl(unittest.TestCase):
905 def test_decode(self):
906 # make sure we have a working base64.b32decode. The one in
907 # python2.4.[01] was broken.
908 furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
909 m = re.match(r'pb://(\w+)@', furl)
911 nodeid = b32decode(m.group(1).upper())
912 self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
914 class Signatures(unittest.TestCase):
916 ann = {"key1": "value1"}
917 sk_s,vk_s = keyutil.make_keypair()
918 sk,ignored = keyutil.parse_privkey(sk_s)
919 ann_t = sign_to_foolscap(ann, sk)
920 (msg, sig, key) = ann_t
921 self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
922 self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
923 self.failUnless(sig.startswith("v0-"))
924 self.failUnless(key.startswith("v0-"))
925 (ann2,key2) = unsign_from_foolscap(ann_t)
926 self.failUnlessEqual(ann2, ann)
927 self.failUnlessEqual("pub-"+key2, vk_s)
930 bad_ann = {"key1": "value2"}
931 bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
932 self.failUnlessRaises(keyutil.BadSignatureError,
933 unsign_from_foolscap, (bad_msg,sig,key))
934 # sneaky bad signature should be ignored
935 (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
936 self.failUnlessEqual(key2, None)
937 self.failUnlessEqual(ann2, bad_ann)
939 # unrecognized signatures
940 self.failUnlessRaises(UnknownKeyError,
941 unsign_from_foolscap, (bad_msg,"v999-sig",key))
942 self.failUnlessRaises(UnknownKeyError,
943 unsign_from_foolscap, (bad_msg,sig,"v999-key"))
946 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
947 # should leave the Reconnector in place, also if it receives
948 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
949 # should tear down the Reconnector and make a new one. This behavior used to
950 # live in the IntroducerClient, and thus used to be tested by test_introducer
952 # copying more tests from old branch:
954 # then also add Upgrade test