3 from base64 import b32decode
6 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.python import log
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14 WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17 get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.util import pollmixin, keyutil
23 import allmydata.test.common_util as testutil
25 class LoggingMultiService(service.MultiService):
26 def log(self, msg, **kw):
29 class Node(testutil.SignalMixin, unittest.TestCase):
30 def test_loadable(self):
31 basedir = "introducer.IntroducerNode.test_loadable"
33 q = IntroducerNode(basedir)
34 d = fireEventually(None)
35 d.addCallback(lambda res: q.startService())
36 d.addCallback(lambda res: q.when_tub_ready())
37 d.addCallback(lambda res: q.stopService())
38 d.addCallback(flushEventualQueue)
43 self.parent = LoggingMultiService()
44 self.parent.startService()
46 log.msg("TestIntroducer.tearDown")
47 d = defer.succeed(None)
48 d.addCallback(lambda res: self.parent.stopService())
49 d.addCallback(flushEventualQueue)
52 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
54 def test_create(self):
55 ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
56 "my_version", "oldest_version", {})
57 self.failUnless(isinstance(ic, IntroducerClient))
59 def test_listen(self):
60 i = IntroducerService()
61 i.setServiceParent(self.parent)
63 def test_duplicate_publish(self):
64 i = IntroducerService()
65 self.failUnlessEqual(len(i.get_announcements()), 0)
66 self.failUnlessEqual(len(i.get_subscribers()), 0)
67 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
68 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
69 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
70 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
71 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
72 i.remote_publish(ann1)
73 self.failUnlessEqual(len(i.get_announcements()), 1)
74 self.failUnlessEqual(len(i.get_subscribers()), 0)
75 i.remote_publish(ann2)
76 self.failUnlessEqual(len(i.get_announcements()), 2)
77 self.failUnlessEqual(len(i.get_subscribers()), 0)
78 i.remote_publish(ann1b)
79 self.failUnlessEqual(len(i.get_announcements()), 2)
80 self.failUnlessEqual(len(i.get_subscribers()), 0)
82 def test_id_collision(self):
83 # test replacement case where tubid equals a keyid (one should
84 # not replace the other)
85 i = IntroducerService()
86 ic = IntroducerClient(None,
87 "introducer.furl", u"my_nickname",
88 "my_version", "oldest_version", {})
89 sk_s, vk_s = keyutil.make_keypair()
90 sk, _ignored = keyutil.parse_privkey(sk_s)
91 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
92 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
93 ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
94 i.remote_publish_v2(ann_t, Referenceable())
95 announcements = i.get_announcements()
96 self.failUnlessEqual(len(announcements), 1)
97 key1 = ("storage", "v0-"+keyid, None)
98 self.failUnless(key1 in announcements)
99 (ign, ign, ann1_out, ign) = announcements[key1]
100 self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
102 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
103 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
104 i.remote_publish(ann2)
105 self.failUnlessEqual(len(announcements), 2)
106 key2 = ("storage", None, keyid)
107 self.failUnless(key2 in announcements)
108 (ign, ign, ann2_out, ign) = announcements[key2]
109 self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
113 ann = { "anonymous-storage-FURL": furl,
114 "permutation-seed-base32": get_tubid_string(furl) }
117 def make_ann_t(ic, furl, privkey):
118 return ic.create_announcement("storage", make_ann(furl), privkey)
120 class Client(unittest.TestCase):
121 def test_duplicate_receive_v1(self):
122 ic = IntroducerClient(None,
123 "introducer.furl", u"my_nickname",
124 "my_version", "oldest_version", {})
126 ic.subscribe_to("storage",
127 lambda key_s,ann: announcements.append(ann))
128 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
129 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
130 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
131 ca = WrapV2ClientInV1Interface(ic)
133 ca.remote_announce([ann1])
136 self.failUnlessEqual(len(announcements), 1)
137 self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
138 self.failUnlessEqual(announcements[0]["my-version"], "ver23")
139 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
140 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
141 self.failUnlessEqual(ic._debug_counts["update"], 0)
142 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
143 # now send a duplicate announcement: this should not notify clients
144 ca.remote_announce([ann1])
145 return fireEventually()
148 self.failUnlessEqual(len(announcements), 1)
149 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
150 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
151 self.failUnlessEqual(ic._debug_counts["update"], 0)
152 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
153 # and a replacement announcement: same FURL, new other stuff.
154 # Clients should be notified.
155 ca.remote_announce([ann1b])
156 return fireEventually()
157 d.addCallback(_then2)
159 self.failUnlessEqual(len(announcements), 2)
160 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
161 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
162 self.failUnlessEqual(ic._debug_counts["update"], 1)
163 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
164 # test that the other stuff changed
165 self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
166 self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
167 d.addCallback(_then3)
170 def test_duplicate_receive_v2(self):
171 ic1 = IntroducerClient(None,
172 "introducer.furl", u"my_nickname",
173 "ver23", "oldest_version", {})
174 # we use a second client just to create a different-looking
176 ic2 = IntroducerClient(None,
177 "introducer.furl", u"my_nickname",
178 "ver24","oldest_version",{})
180 def _received(key_s, ann):
181 announcements.append( (key_s, ann) )
182 ic1.subscribe_to("storage", _received)
183 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
184 furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
185 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
187 privkey_s, pubkey_vs = keyutil.make_keypair()
188 privkey, _ignored = keyutil.parse_privkey(privkey_s)
189 pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
192 # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
196 self.ann1 = make_ann_t(ic1, furl1, privkey)
197 self.ann1a = make_ann_t(ic1, furl1a, privkey)
198 self.ann1b = make_ann_t(ic2, furl1, privkey)
199 self.ann2 = make_ann_t(ic2, furl2, privkey)
201 ic1.remote_announce_v2([self.ann1]) # queues eventual-send
204 self.failUnlessEqual(len(announcements), 1)
205 key_s,ann = announcements[0]
206 self.failUnlessEqual(key_s, pubkey_s)
207 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
208 self.failUnlessEqual(ann["my-version"], "ver23")
209 d.addCallback(_then1)
211 # now send a duplicate announcement. This should not fire the
213 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
214 d.addCallback(fireEventually)
216 self.failUnlessEqual(len(announcements), 1)
217 d.addCallback(_then2)
219 # and a replacement announcement: same FURL, new other stuff. The
220 # subscriber *should* be fired.
221 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
222 d.addCallback(fireEventually)
224 self.failUnlessEqual(len(announcements), 2)
225 key_s,ann = announcements[-1]
226 self.failUnlessEqual(key_s, pubkey_s)
227 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
228 self.failUnlessEqual(ann["my-version"], "ver24")
229 d.addCallback(_then3)
231 # and a replacement announcement with a different FURL (it uses
232 # different connection hints)
233 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
234 d.addCallback(fireEventually)
236 self.failUnlessEqual(len(announcements), 3)
237 key_s,ann = announcements[-1]
238 self.failUnlessEqual(key_s, pubkey_s)
239 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
240 self.failUnlessEqual(ann["my-version"], "ver23")
241 d.addCallback(_then4)
243 # now add a new subscription, which should be called with the
244 # backlog. The introducer only records one announcement per index, so
245 # the backlog will only have the latest message.
247 def _received2(key_s, ann):
248 announcements2.append( (key_s, ann) )
249 d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
250 d.addCallback(fireEventually)
252 self.failUnlessEqual(len(announcements2), 1)
253 key_s,ann = announcements2[-1]
254 self.failUnlessEqual(key_s, pubkey_s)
255 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
256 self.failUnlessEqual(ann["my-version"], "ver23")
257 d.addCallback(_then5)
260 def test_id_collision(self):
261 # test replacement case where tubid equals a keyid (one should
262 # not replace the other)
263 ic = IntroducerClient(None,
264 "introducer.furl", u"my_nickname",
265 "my_version", "oldest_version", {})
267 ic.subscribe_to("storage",
268 lambda key_s,ann: announcements.append(ann))
269 sk_s, vk_s = keyutil.make_keypair()
270 sk, _ignored = keyutil.parse_privkey(sk_s)
271 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
272 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
273 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
274 ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
275 ic.remote_announce_v2([ann_t])
278 # first announcement has been processed
279 self.failUnlessEqual(len(announcements), 1)
280 self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
282 # now submit a second one, with a tubid that happens to look just
283 # like the pubkey-based serverid we just processed. They should
285 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
286 ca = WrapV2ClientInV1Interface(ic)
287 ca.remote_announce([ann2])
288 return fireEventually()
291 # if they overlapped, the second announcement would be ignored
292 self.failUnlessEqual(len(announcements), 2)
293 self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
295 d.addCallback(_then2)
299 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
301 def create_tub(self, portnum=0):
302 tubfile = os.path.join(self.basedir, "tub.pem")
303 self.central_tub = tub = Tub(certFile=tubfile)
304 #tub.setOption("logLocalFailures", True)
305 #tub.setOption("logRemoteFailures", True)
306 tub.setOption("expose-remote-exception-types", False)
307 tub.setServiceParent(self.parent)
308 l = tub.listenOn("tcp:%d" % portnum)
309 self.central_portnum = l.getPortnum()
311 assert self.central_portnum == portnum
312 tub.setLocation("localhost:%d" % self.central_portnum)
314 class Queue(SystemTestMixin, unittest.TestCase):
315 def test_queue_until_connected(self):
316 self.basedir = "introducer/QueueUntilConnected/queued"
317 os.makedirs(self.basedir)
319 introducer = IntroducerService()
320 introducer.setServiceParent(self.parent)
321 iff = os.path.join(self.basedir, "introducer.furl")
322 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
324 tub2.setServiceParent(self.parent)
325 c = IntroducerClient(tub2, ifurl,
326 u"nickname", "version", "oldest", {})
327 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
328 sk_s, vk_s = keyutil.make_keypair()
329 sk, _ignored = keyutil.parse_privkey(sk_s)
331 d = introducer.disownServiceParent()
333 # now that the introducer server is offline, create a client and
334 # publish some messages
335 c.setServiceParent(self.parent) # this starts the reconnector
336 c.publish("storage", make_ann(furl1), sk)
338 introducer.setServiceParent(self.parent) # restart the server
339 # now wait for the messages to be delivered
340 def _got_announcement():
341 return bool(introducer.get_announcements())
342 return self.poll(_got_announcement)
343 d.addCallback(_offline)
345 v = list(introducer.get_announcements().values())[0]
346 (ign, ign, ann1_out, ign) = v
347 self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
350 # now let the ack get back
351 def _wait_until_idle(ign):
353 if c._debug_outstanding:
355 if introducer._debug_outstanding:
358 return self.poll(_idle)
359 d.addCallback(_wait_until_idle)
364 class SystemTest(SystemTestMixin, unittest.TestCase):
366 def do_system_test(self, server_version):
368 if server_version == V1:
369 introducer = old.IntroducerService_v1()
371 introducer = IntroducerService()
372 introducer.setServiceParent(self.parent)
373 iff = os.path.join(self.basedir, "introducer.furl")
374 tub = self.central_tub
375 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
376 self.introducer_furl = ifurl
378 # we have 5 clients who publish themselves as storage servers, and a
379 # sixth which does which not. All 6 clients subscriber to hear about
380 # storage. When the connections are fully established, all six nodes
381 # should have 5 connections each.
387 received_announcements = {}
388 subscribing_clients = []
389 publishing_clients = []
390 self.the_introducer = introducer
392 expected_announcements = [0 for c in range(NUM_CLIENTS)]
394 for i in range(NUM_CLIENTS):
396 #tub.setOption("logLocalFailures", True)
397 #tub.setOption("logRemoteFailures", True)
398 tub.setOption("expose-remote-exception-types", False)
399 tub.setServiceParent(self.parent)
400 l = tub.listenOn("tcp:0")
401 portnum = l.getPortnum()
402 tub.setLocation("localhost:%d" % portnum)
404 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
406 c = old.IntroducerClient_v1(tub, self.introducer_furl,
410 c = IntroducerClient(tub, self.introducer_furl,
413 {"component": "component-v1"})
414 received_announcements[c] = {}
415 def got(key_s_or_tubid, ann, announcements, i):
417 index = get_tubid_string_from_ann(ann)
419 index = key_s_or_tubid or get_tubid_string_from_ann(ann)
420 announcements[index] = ann
421 c.subscribe_to("storage", got, received_announcements[c], i)
422 subscribing_clients.append(c)
423 expected_announcements[i] += 1 # all expect a 'storage' announcement
425 node_furl = tub.registerReference(Referenceable())
428 c.publish(node_furl, "storage", "ri_name")
430 # sign the announcement
431 privkey_s, pubkey_s = keyutil.make_keypair()
432 privkey, _ignored = keyutil.parse_privkey(privkey_s)
433 privkeys[c] = privkey
434 c.publish("storage", make_ann(node_furl), privkey)
436 c.publish("storage", make_ann(node_furl))
437 publishing_clients.append(c)
439 # the last one does not publish anything
443 # users of the V1 client were required to publish a
444 # 'stub_client' record (somewhat after they published the
445 # 'storage' record), so the introducer could see their
446 # version. Match that behavior.
447 c.publish(node_furl, "stub_client", "stub_ri_name")
450 # also publish something that nobody cares about
451 boring_furl = tub.registerReference(Referenceable())
452 c.publish("boring", make_ann(boring_furl))
454 c.setServiceParent(self.parent)
459 def _wait_for_connected(ign):
462 if not c.connected_to_introducer():
465 return self.poll(_connected)
467 # we watch the clients to determine when the system has settled down.
468 # Then we can look inside the server to assert things about its
471 def _wait_for_expected_announcements(ign):
472 def _got_expected_announcements():
473 for i,c in enumerate(subscribing_clients):
474 if len(received_announcements[c]) < expected_announcements[i]:
477 return self.poll(_got_expected_announcements)
479 # before shutting down any Tub, we'd like to know that there are no
480 # messages outstanding
482 def _wait_until_idle(ign):
484 for c in subscribing_clients + publishing_clients:
485 if c._debug_outstanding:
487 if self.the_introducer._debug_outstanding:
490 return self.poll(_idle)
492 d = defer.succeed(None)
493 d.addCallback(_wait_for_connected)
494 d.addCallback(_wait_for_expected_announcements)
495 d.addCallback(_wait_until_idle)
498 log.msg("doing _check1")
499 dc = self.the_introducer._debug_counts
500 if server_version == V1:
501 # each storage server publishes a record, and (after its
502 # 'subscribe' has been ACKed) also publishes a "stub_client".
503 # The non-storage client (which subscribes) also publishes a
504 # stub_client. There is also one "boring" service. The number
505 # of messages is higher, because the stub_clients aren't
506 # published until after we get the 'subscribe' ack (since we
507 # don't realize that we're dealing with a v1 server [which
508 # needs stub_clients] until then), and the act of publishing
509 # the stub_client causes us to re-send all previous
511 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
512 NUM_STORAGE + NUM_CLIENTS + 1)
514 # each storage server publishes a record. There is also one
515 # "stub_client" and one "boring"
516 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
517 self.failUnlessEqual(dc["inbound_duplicate"], 0)
518 self.failUnlessEqual(dc["inbound_update"], 0)
519 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
520 # the number of outbound messages is tricky.. I think it depends
521 # upon a race between the publish and the subscribe messages.
522 self.failUnless(dc["outbound_message"] > 0)
523 # each client subscribes to "storage", and each server publishes
524 self.failUnlessEqual(dc["outbound_announcements"],
525 NUM_STORAGE*NUM_CLIENTS)
527 for c in subscribing_clients:
528 cdc = c._debug_counts
529 self.failUnless(cdc["inbound_message"])
530 self.failUnlessEqual(cdc["inbound_announcement"],
532 self.failUnlessEqual(cdc["wrong_service"], 0)
533 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
534 self.failUnlessEqual(cdc["update"], 0)
535 self.failUnlessEqual(cdc["new_announcement"],
537 anns = received_announcements[c]
538 self.failUnlessEqual(len(anns), NUM_STORAGE)
540 nodeid0 = tubs[clients[0]].tubID
542 nick = ann["nickname"]
543 self.failUnlessEqual(type(nick), unicode)
544 self.failUnlessEqual(nick, u"nickname-0")
545 if server_version == V1:
546 for c in publishing_clients:
547 cdc = c._debug_counts
548 expected = 1 # storage
550 expected += 1 # boring
551 if c is not clients[0]:
552 # the v2 client tries to call publish_v2, which fails
553 # because the server is v1. It then re-sends
554 # everything it has so far, plus a stub_client record
555 expected = 2*expected + 1
557 # we always tell v1 client to send stub_client
559 self.failUnlessEqual(cdc["outbound_message"], expected)
561 for c in publishing_clients:
562 cdc = c._debug_counts
564 if c in [clients[0], # stub_client
568 self.failUnlessEqual(cdc["outbound_message"], expected)
569 log.msg("_check1 done")
570 d.addCallback(_check1)
572 # force an introducer reconnect, by shutting down the Tub it's using
573 # and starting a new Tub (with the old introducer). Everybody should
574 # reconnect and republish, but the introducer should ignore the
575 # republishes as duplicates. However, because the server doesn't know
576 # what each client does and does not know, it will send them a copy
577 # of the current announcement table anyway.
579 d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
580 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
582 def _wait_for_introducer_loss(ign):
583 def _introducer_lost():
585 if c.connected_to_introducer():
588 return self.poll(_introducer_lost)
589 d.addCallback(_wait_for_introducer_loss)
591 def _restart_introducer_tub(_ign):
592 log.msg("restarting introducer's Tub")
594 for i in range(NUM_CLIENTS):
595 c = subscribing_clients[i]
596 for k in c._debug_counts:
597 c._debug_counts[k] = 0
598 for k in self.the_introducer._debug_counts:
599 self.the_introducer._debug_counts[k] = 0
600 expected_announcements[i] += 1 # new 'storage' for everyone
601 self.create_tub(self.central_portnum)
602 newfurl = self.central_tub.registerReference(self.the_introducer,
604 assert newfurl == self.introducer_furl
605 d.addCallback(_restart_introducer_tub)
607 d.addCallback(_wait_for_connected)
608 d.addCallback(_wait_for_expected_announcements)
609 d.addCallback(_wait_until_idle)
610 d.addCallback(lambda _ign: log.msg(" reconnected"))
612 # TODO: publish something while the introducer is offline, then
613 # confirm it gets delivered when the connection is reestablished
615 log.msg("doing _check2")
616 # assert that the introducer sent out new messages, one per
618 dc = self.the_introducer._debug_counts
619 self.failUnlessEqual(dc["outbound_announcements"],
620 NUM_STORAGE*NUM_CLIENTS)
621 self.failUnless(dc["outbound_message"] > 0)
622 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
623 for c in subscribing_clients:
624 cdc = c._debug_counts
625 self.failUnlessEqual(cdc["inbound_message"], 1)
626 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
627 self.failUnlessEqual(cdc["new_announcement"], 0)
628 self.failUnlessEqual(cdc["wrong_service"], 0)
629 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
630 d.addCallback(_check2)
632 # Then force an introducer restart, by shutting down the Tub,
633 # destroying the old introducer, and starting a new Tub+Introducer.
634 # Everybody should reconnect and republish, and the (new) introducer
635 # will distribute the new announcements, but the clients should
636 # ignore the republishes as duplicates.
638 d.addCallback(lambda _ign: log.msg("shutting down introducer"))
639 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
640 d.addCallback(_wait_for_introducer_loss)
641 d.addCallback(lambda _ign: log.msg("introducer lost"))
643 def _restart_introducer(_ign):
644 log.msg("restarting introducer")
645 self.create_tub(self.central_portnum)
647 for i in range(NUM_CLIENTS):
648 c = subscribing_clients[i]
649 for k in c._debug_counts:
650 c._debug_counts[k] = 0
651 expected_announcements[i] += 1 # new 'storage' for everyone
652 if server_version == V1:
653 introducer = old.IntroducerService_v1()
655 introducer = IntroducerService()
656 self.the_introducer = introducer
657 newfurl = self.central_tub.registerReference(self.the_introducer,
659 assert newfurl == self.introducer_furl
660 d.addCallback(_restart_introducer)
662 d.addCallback(_wait_for_connected)
663 d.addCallback(_wait_for_expected_announcements)
664 d.addCallback(_wait_until_idle)
667 log.msg("doing _check3")
668 dc = self.the_introducer._debug_counts
669 self.failUnlessEqual(dc["outbound_announcements"],
670 NUM_STORAGE*NUM_CLIENTS)
671 self.failUnless(dc["outbound_message"] > 0)
672 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
673 for c in subscribing_clients:
674 cdc = c._debug_counts
675 self.failUnless(cdc["inbound_message"] > 0)
676 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
677 self.failUnlessEqual(cdc["new_announcement"], 0)
678 self.failUnlessEqual(cdc["wrong_service"], 0)
679 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
681 d.addCallback(_check3)
685 def test_system_v2_server(self):
686 self.basedir = "introducer/SystemTest/system_v2_server"
687 os.makedirs(self.basedir)
688 return self.do_system_test(V2)
689 test_system_v2_server.timeout = 480
690 # occasionally takes longer than 350s on "draco"
692 def test_system_v1_server(self):
693 self.basedir = "introducer/SystemTest/system_v1_server"
694 os.makedirs(self.basedir)
695 return self.do_system_test(V1)
696 test_system_v1_server.timeout = 480
697 # occasionally takes longer than 350s on "draco"
699 class FakeRemoteReference:
700 def notifyOnDisconnect(self, *args, **kwargs): pass
701 def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
703 class ClientInfo(unittest.TestCase):
704 def test_client_v2(self):
705 introducer = IntroducerService()
706 tub = introducer_furl = None
707 app_versions = {"whizzy": "fizzy"}
708 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
709 "my_version", "oldest", app_versions)
710 #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
711 #ann_s = make_ann_t(client_v2, furl1, None)
712 #introducer.remote_publish_v2(ann_s, Referenceable())
713 subscriber = FakeRemoteReference()
714 introducer.remote_subscribe_v2(subscriber, "storage",
715 client_v2._my_subscriber_info)
716 s = introducer.get_subscribers()
717 self.failUnlessEqual(len(s), 1)
718 sn, when, si, rref = s[0]
719 self.failUnlessIdentical(rref, subscriber)
720 self.failUnlessEqual(sn, "storage")
721 self.failUnlessEqual(si["version"], 0)
722 self.failUnlessEqual(si["oldest-supported"], "oldest")
723 self.failUnlessEqual(si["app-versions"], app_versions)
724 self.failUnlessEqual(si["nickname"], u"nick-v2")
725 self.failUnlessEqual(si["my-version"], "my_version")
727 def test_client_v1(self):
728 introducer = IntroducerService()
729 subscriber = FakeRemoteReference()
730 introducer.remote_subscribe(subscriber, "storage")
731 # the v1 subscribe interface had no subscriber_info: that was usually
732 # sent in a separate stub_client pseudo-announcement
733 s = introducer.get_subscribers()
734 self.failUnlessEqual(len(s), 1)
735 sn, when, si, rref = s[0]
736 # rref will be a WrapV1SubscriberInV2Interface around the real
738 self.failUnlessIdentical(rref.original, subscriber)
739 self.failUnlessEqual(si, None) # not known yet
740 self.failUnlessEqual(sn, "storage")
742 # now submit the stub_client announcement
743 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
744 ann = (furl1, "stub_client", "RIStubClient",
745 u"nick-v1".encode("utf-8"), "my_version", "oldest")
746 introducer.remote_publish(ann)
747 # the server should correlate the two
748 s = introducer.get_subscribers()
749 self.failUnlessEqual(len(s), 1)
750 sn, when, si, rref = s[0]
751 self.failUnlessIdentical(rref.original, subscriber)
752 self.failUnlessEqual(sn, "storage")
754 self.failUnlessEqual(si["version"], 0)
755 self.failUnlessEqual(si["oldest-supported"], "oldest")
756 # v1 announcements do not contain app-versions
757 self.failUnlessEqual(si["app-versions"], {})
758 self.failUnlessEqual(si["nickname"], u"nick-v1")
759 self.failUnlessEqual(si["my-version"], "my_version")
761 # a subscription that arrives after the stub_client announcement
762 # should be correlated too
763 subscriber2 = FakeRemoteReference()
764 introducer.remote_subscribe(subscriber2, "thing2")
766 s = introducer.get_subscribers()
767 subs = dict([(sn, (si,rref)) for sn, when, si, rref in s])
768 self.failUnlessEqual(len(subs), 2)
769 (si,rref) = subs["thing2"]
770 self.failUnlessIdentical(rref.original, subscriber2)
771 self.failUnlessEqual(si["version"], 0)
772 self.failUnlessEqual(si["oldest-supported"], "oldest")
773 # v1 announcements do not contain app-versions
774 self.failUnlessEqual(si["app-versions"], {})
775 self.failUnlessEqual(si["nickname"], u"nick-v1")
776 self.failUnlessEqual(si["my-version"], "my_version")
778 class Announcements(unittest.TestCase):
779 def test_client_v2_unsigned(self):
780 introducer = IntroducerService()
781 tub = introducer_furl = None
782 app_versions = {"whizzy": "fizzy"}
783 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
784 "my_version", "oldest", app_versions)
785 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
786 tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
787 ann_s0 = make_ann_t(client_v2, furl1, None)
788 canary0 = Referenceable()
789 introducer.remote_publish_v2(ann_s0, canary0)
790 a = introducer.get_announcements()
791 self.failUnlessEqual(len(a), 1)
792 (index, (ann_s, canary, ann, when)) = a.items()[0]
793 self.failUnlessIdentical(canary, canary0)
794 self.failUnlessEqual(index, ("storage", None, tubid))
795 self.failUnlessEqual(ann["app-versions"], app_versions)
796 self.failUnlessEqual(ann["nickname"], u"nick-v2")
797 self.failUnlessEqual(ann["service-name"], "storage")
798 self.failUnlessEqual(ann["my-version"], "my_version")
799 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
801 def test_client_v2_signed(self):
802 introducer = IntroducerService()
803 tub = introducer_furl = None
804 app_versions = {"whizzy": "fizzy"}
805 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
806 "my_version", "oldest", app_versions)
807 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
808 sk_s, vk_s = keyutil.make_keypair()
809 sk, _ignored = keyutil.parse_privkey(sk_s)
810 pks = keyutil.remove_prefix(vk_s, "pub-")
811 ann_t0 = make_ann_t(client_v2, furl1, sk)
812 canary0 = Referenceable()
813 introducer.remote_publish_v2(ann_t0, canary0)
814 a = introducer.get_announcements()
815 self.failUnlessEqual(len(a), 1)
816 (index, (ann_s, canary, ann, when)) = a.items()[0]
817 self.failUnlessIdentical(canary, canary0)
818 self.failUnlessEqual(index, ("storage", pks, None))
819 self.failUnlessEqual(ann["app-versions"], app_versions)
820 self.failUnlessEqual(ann["nickname"], u"nick-v2")
821 self.failUnlessEqual(ann["service-name"], "storage")
822 self.failUnlessEqual(ann["my-version"], "my_version")
823 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
825 def test_client_v1(self):
826 introducer = IntroducerService()
828 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
829 tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
830 ann = (furl1, "storage", "RIStorage",
831 u"nick-v1".encode("utf-8"), "my_version", "oldest")
832 introducer.remote_publish(ann)
834 a = introducer.get_announcements()
835 self.failUnlessEqual(len(a), 1)
836 (index, (ann_s, canary, ann, when)) = a.items()[0]
837 self.failUnlessEqual(canary, None)
838 self.failUnlessEqual(index, ("storage", None, tubid))
839 self.failUnlessEqual(ann["app-versions"], {})
840 self.failUnlessEqual(ann["nickname"], u"nick-v1".encode("utf-8"))
841 self.failUnlessEqual(ann["service-name"], "storage")
842 self.failUnlessEqual(ann["my-version"], "my_version")
843 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
846 class TooNewServer(IntroducerService):
847 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
849 "application-version": "greetings from the crazy future",
852 class NonV1Server(SystemTestMixin, unittest.TestCase):
853 # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
854 # protocol, it is supposed to provide a useful error instead of a weird
857 def test_failure(self):
858 self.basedir = "introducer/NonV1Server/failure"
859 os.makedirs(self.basedir)
862 i.setServiceParent(self.parent)
863 self.introducer_furl = self.central_tub.registerReference(i)
866 tub.setOption("expose-remote-exception-types", False)
867 tub.setServiceParent(self.parent)
868 l = tub.listenOn("tcp:0")
869 portnum = l.getPortnum()
870 tub.setLocation("localhost:%d" % portnum)
872 c = IntroducerClient(tub, self.introducer_furl,
873 u"nickname-client", "version", "oldest", {})
876 announcements[key_s] = ann
877 c.subscribe_to("storage", got)
879 c.setServiceParent(self.parent)
881 # now we wait for it to connect and notice the bad version
884 return bool(c._introducer_error) or bool(c._publisher)
885 d = self.poll(_got_bad)
887 self.failUnless(c._introducer_error)
888 self.failUnless(c._introducer_error.check(InsufficientVersionError),
893 class DecodeFurl(unittest.TestCase):
894 def test_decode(self):
895 # make sure we have a working base64.b32decode. The one in
896 # python2.4.[01] was broken.
897 furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
898 m = re.match(r'pb://(\w+)@', furl)
900 nodeid = b32decode(m.group(1).upper())
901 self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
903 class Signatures(unittest.TestCase):
905 ann = {"key1": "value1"}
906 sk_s,vk_s = keyutil.make_keypair()
907 sk,ignored = keyutil.parse_privkey(sk_s)
908 ann_t = sign_to_foolscap(ann, sk)
909 (msg, sig, key) = ann_t
910 self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
911 self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
912 self.failUnless(sig.startswith("v0-"))
913 self.failUnless(key.startswith("v0-"))
914 (ann2,key2) = unsign_from_foolscap(ann_t)
915 self.failUnlessEqual(ann2, ann)
916 self.failUnlessEqual("pub-"+key2, vk_s)
919 bad_ann = {"key1": "value2"}
920 bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
921 self.failUnlessRaises(keyutil.BadSignatureError,
922 unsign_from_foolscap, (bad_msg,sig,key))
923 # sneaky bad signature should be ignored
924 (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
925 self.failUnlessEqual(key2, None)
926 self.failUnlessEqual(ann2, bad_ann)
928 # unrecognized signatures
929 self.failUnlessRaises(UnknownKeyError,
930 unsign_from_foolscap, (bad_msg,"v999-sig",key))
931 self.failUnlessRaises(UnknownKeyError,
932 unsign_from_foolscap, (bad_msg,sig,"v999-key"))
935 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
936 # should leave the Reconnector in place, also if it receives
937 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
938 # should tear down the Reconnector and make a new one. This behavior used to
939 # live in the IntroducerClient, and thus used to be tested by test_introducer
941 # copying more tests from old branch:
943 # then also add Upgrade test