3 from base64 import b32decode
6 from twisted.trial import unittest
7 from twisted.internet import defer, address
8 from twisted.python import log
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14 WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17 get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.web import introweb
23 from allmydata.util import pollmixin, keyutil, idlib
24 import allmydata.test.common_util as testutil
26 class LoggingMultiService(service.MultiService):
27 def log(self, msg, **kw):
30 class Node(testutil.SignalMixin, unittest.TestCase):
31 def test_loadable(self):
32 basedir = "introducer.IntroducerNode.test_loadable"
34 q = IntroducerNode(basedir)
35 d = fireEventually(None)
36 d.addCallback(lambda res: q.startService())
37 d.addCallback(lambda res: q.when_tub_ready())
38 d.addCallback(lambda res: q.stopService())
39 d.addCallback(flushEventualQueue)
44 self.parent = LoggingMultiService()
45 self.parent.startService()
47 log.msg("TestIntroducer.tearDown")
48 d = defer.succeed(None)
49 d.addCallback(lambda res: self.parent.stopService())
50 d.addCallback(flushEventualQueue)
53 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
55 def test_create(self):
56 ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
57 "my_version", "oldest_version", {})
58 self.failUnless(isinstance(ic, IntroducerClient))
60 def test_listen(self):
61 i = IntroducerService()
62 i.setServiceParent(self.parent)
64 def test_duplicate_publish(self):
65 i = IntroducerService()
66 self.failUnlessEqual(len(i.get_announcements()), 0)
67 self.failUnlessEqual(len(i.get_subscribers()), 0)
68 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
69 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
70 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
71 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
72 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
73 i.remote_publish(ann1)
74 self.failUnlessEqual(len(i.get_announcements()), 1)
75 self.failUnlessEqual(len(i.get_subscribers()), 0)
76 i.remote_publish(ann2)
77 self.failUnlessEqual(len(i.get_announcements()), 2)
78 self.failUnlessEqual(len(i.get_subscribers()), 0)
79 i.remote_publish(ann1b)
80 self.failUnlessEqual(len(i.get_announcements()), 2)
81 self.failUnlessEqual(len(i.get_subscribers()), 0)
83 def test_id_collision(self):
84 # test replacement case where tubid equals a keyid (one should
85 # not replace the other)
86 i = IntroducerService()
87 ic = IntroducerClient(None,
88 "introducer.furl", u"my_nickname",
89 "my_version", "oldest_version", {})
90 sk_s, vk_s = keyutil.make_keypair()
91 sk, _ignored = keyutil.parse_privkey(sk_s)
92 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
93 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
94 ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
95 i.remote_publish_v2(ann_t, Referenceable())
96 announcements = i.get_announcements()
97 self.failUnlessEqual(len(announcements), 1)
98 key1 = ("storage", "v0-"+keyid, None)
99 self.failUnlessEqual(announcements[0].index, key1)
100 ann1_out = announcements[0].announcement
101 self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
103 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
104 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
105 i.remote_publish(ann2)
106 announcements = i.get_announcements()
107 self.failUnlessEqual(len(announcements), 2)
108 key2 = ("storage", None, keyid)
109 wanted = [ad for ad in announcements if ad.index == key2]
110 self.failUnlessEqual(len(wanted), 1)
111 ann2_out = wanted[0].announcement
112 self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
116 ann = { "anonymous-storage-FURL": furl,
117 "permutation-seed-base32": get_tubid_string(furl) }
120 def make_ann_t(ic, furl, privkey, seqnum):
122 ann["seqnum"] = seqnum
126 return ic.create_announcement("storage", make_ann(furl), privkey, mod)
128 class Client(unittest.TestCase):
129 def test_duplicate_receive_v1(self):
130 ic = IntroducerClient(None,
131 "introducer.furl", u"my_nickname",
132 "my_version", "oldest_version", {})
134 ic.subscribe_to("storage",
135 lambda key_s,ann: announcements.append(ann))
136 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
137 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
138 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
139 ca = WrapV2ClientInV1Interface(ic)
141 ca.remote_announce([ann1])
144 self.failUnlessEqual(len(announcements), 1)
145 self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
146 self.failUnlessEqual(announcements[0]["my-version"], "ver23")
147 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
148 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
149 self.failUnlessEqual(ic._debug_counts["update"], 0)
150 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
151 # now send a duplicate announcement: this should not notify clients
152 ca.remote_announce([ann1])
153 return fireEventually()
156 self.failUnlessEqual(len(announcements), 1)
157 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
158 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
159 self.failUnlessEqual(ic._debug_counts["update"], 0)
160 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
161 # and a replacement announcement: same FURL, new other stuff.
162 # Clients should be notified.
163 ca.remote_announce([ann1b])
164 return fireEventually()
165 d.addCallback(_then2)
167 self.failUnlessEqual(len(announcements), 2)
168 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
169 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
170 self.failUnlessEqual(ic._debug_counts["update"], 1)
171 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
172 # test that the other stuff changed
173 self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
174 self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
175 d.addCallback(_then3)
178 def test_duplicate_receive_v2(self):
179 ic1 = IntroducerClient(None,
180 "introducer.furl", u"my_nickname",
181 "ver23", "oldest_version", {})
182 # we use a second client just to create a different-looking
184 ic2 = IntroducerClient(None,
185 "introducer.furl", u"my_nickname",
186 "ver24","oldest_version",{})
188 def _received(key_s, ann):
189 announcements.append( (key_s, ann) )
190 ic1.subscribe_to("storage", _received)
191 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
192 furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
193 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
195 privkey_s, pubkey_vs = keyutil.make_keypair()
196 privkey, _ignored = keyutil.parse_privkey(privkey_s)
197 pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
200 # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
204 self.ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
205 self.ann1old = make_ann_t(ic1, furl1, privkey, seqnum=9)
206 self.ann1noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
207 self.ann1b = make_ann_t(ic2, furl1, privkey, seqnum=11)
208 self.ann1a = make_ann_t(ic1, furl1a, privkey, seqnum=12)
209 self.ann2 = make_ann_t(ic2, furl2, privkey, seqnum=13)
211 ic1.remote_announce_v2([self.ann1]) # queues eventual-send
214 self.failUnlessEqual(len(announcements), 1)
215 key_s,ann = announcements[0]
216 self.failUnlessEqual(key_s, pubkey_s)
217 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
218 self.failUnlessEqual(ann["my-version"], "ver23")
219 d.addCallback(_then1)
221 # now send a duplicate announcement. This should not fire the
223 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
224 d.addCallback(fireEventually)
226 self.failUnlessEqual(len(announcements), 1)
227 d.addCallback(_then2)
229 # an older announcement shouldn't fire the subscriber either
230 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1old]))
231 d.addCallback(fireEventually)
233 self.failUnlessEqual(len(announcements), 1)
234 d.addCallback(_then2a)
236 # announcement with no seqnum cannot replace one with-seqnum
237 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1noseqnum]))
238 d.addCallback(fireEventually)
240 self.failUnlessEqual(len(announcements), 1)
241 d.addCallback(_then2b)
243 # and a replacement announcement: same FURL, new other stuff. The
244 # subscriber *should* be fired.
245 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
246 d.addCallback(fireEventually)
248 self.failUnlessEqual(len(announcements), 2)
249 key_s,ann = announcements[-1]
250 self.failUnlessEqual(key_s, pubkey_s)
251 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
252 self.failUnlessEqual(ann["my-version"], "ver24")
253 d.addCallback(_then3)
255 # and a replacement announcement with a different FURL (it uses
256 # different connection hints)
257 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
258 d.addCallback(fireEventually)
260 self.failUnlessEqual(len(announcements), 3)
261 key_s,ann = announcements[-1]
262 self.failUnlessEqual(key_s, pubkey_s)
263 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
264 self.failUnlessEqual(ann["my-version"], "ver23")
265 d.addCallback(_then4)
267 # now add a new subscription, which should be called with the
268 # backlog. The introducer only records one announcement per index, so
269 # the backlog will only have the latest message.
271 def _received2(key_s, ann):
272 announcements2.append( (key_s, ann) )
273 d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
274 d.addCallback(fireEventually)
276 self.failUnlessEqual(len(announcements2), 1)
277 key_s,ann = announcements2[-1]
278 self.failUnlessEqual(key_s, pubkey_s)
279 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
280 self.failUnlessEqual(ann["my-version"], "ver23")
281 d.addCallback(_then5)
284 def test_id_collision(self):
285 # test replacement case where tubid equals a keyid (one should
286 # not replace the other)
287 ic = IntroducerClient(None,
288 "introducer.furl", u"my_nickname",
289 "my_version", "oldest_version", {})
291 ic.subscribe_to("storage",
292 lambda key_s,ann: announcements.append(ann))
293 sk_s, vk_s = keyutil.make_keypair()
294 sk, _ignored = keyutil.parse_privkey(sk_s)
295 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
296 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
297 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
298 ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
299 ic.remote_announce_v2([ann_t])
302 # first announcement has been processed
303 self.failUnlessEqual(len(announcements), 1)
304 self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
306 # now submit a second one, with a tubid that happens to look just
307 # like the pubkey-based serverid we just processed. They should
309 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
310 ca = WrapV2ClientInV1Interface(ic)
311 ca.remote_announce([ann2])
312 return fireEventually()
315 # if they overlapped, the second announcement would be ignored
316 self.failUnlessEqual(len(announcements), 2)
317 self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
319 d.addCallback(_then2)
322 class Server(unittest.TestCase):
323 def test_duplicate(self):
324 i = IntroducerService()
325 ic1 = IntroducerClient(None,
326 "introducer.furl", u"my_nickname",
327 "ver23", "oldest_version", {})
328 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
330 privkey_s, _ = keyutil.make_keypair()
331 privkey, _ = keyutil.parse_privkey(privkey_s)
333 ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
334 ann1_old = make_ann_t(ic1, furl1, privkey, seqnum=9)
335 ann1_new = make_ann_t(ic1, furl1, privkey, seqnum=11)
336 ann1_noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
338 i.remote_publish_v2(ann1, None)
339 all = i.get_announcements()
340 self.failUnlessEqual(len(all), 1)
341 self.failUnlessEqual(all[0].announcement["seqnum"], 10)
342 self.failUnlessEqual(i._debug_counts["inbound_message"], 1)
343 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 0)
344 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
345 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
346 self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
348 i.remote_publish_v2(ann1, None)
349 all = i.get_announcements()
350 self.failUnlessEqual(len(all), 1)
351 self.failUnlessEqual(all[0].announcement["seqnum"], 10)
352 self.failUnlessEqual(i._debug_counts["inbound_message"], 2)
353 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
354 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
355 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
356 self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
358 i.remote_publish_v2(ann1_old, None)
359 all = i.get_announcements()
360 self.failUnlessEqual(len(all), 1)
361 self.failUnlessEqual(all[0].announcement["seqnum"], 10)
362 self.failUnlessEqual(i._debug_counts["inbound_message"], 3)
363 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
364 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
365 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
366 self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
368 i.remote_publish_v2(ann1_new, None)
369 all = i.get_announcements()
370 self.failUnlessEqual(len(all), 1)
371 self.failUnlessEqual(all[0].announcement["seqnum"], 11)
372 self.failUnlessEqual(i._debug_counts["inbound_message"], 4)
373 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
374 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
375 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
376 self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
378 i.remote_publish_v2(ann1_noseqnum, None)
379 all = i.get_announcements()
380 self.failUnlessEqual(len(all), 1)
381 self.failUnlessEqual(all[0].announcement["seqnum"], 11)
382 self.failUnlessEqual(i._debug_counts["inbound_message"], 5)
383 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
384 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 1)
385 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
386 self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
389 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
391 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
393 def create_tub(self, portnum=0):
394 tubfile = os.path.join(self.basedir, "tub.pem")
395 self.central_tub = tub = Tub(certFile=tubfile)
396 #tub.setOption("logLocalFailures", True)
397 #tub.setOption("logRemoteFailures", True)
398 tub.setOption("expose-remote-exception-types", False)
399 tub.setServiceParent(self.parent)
400 l = tub.listenOn("tcp:%d" % portnum)
401 self.central_portnum = l.getPortnum()
403 assert self.central_portnum == portnum
404 tub.setLocation("localhost:%d" % self.central_portnum)
406 class Queue(SystemTestMixin, unittest.TestCase):
407 def test_queue_until_connected(self):
408 self.basedir = "introducer/QueueUntilConnected/queued"
409 os.makedirs(self.basedir)
411 introducer = IntroducerService()
412 introducer.setServiceParent(self.parent)
413 iff = os.path.join(self.basedir, "introducer.furl")
414 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
416 tub2.setServiceParent(self.parent)
417 c = IntroducerClient(tub2, ifurl,
418 u"nickname", "version", "oldest", {})
419 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
420 sk_s, vk_s = keyutil.make_keypair()
421 sk, _ignored = keyutil.parse_privkey(sk_s)
423 d = introducer.disownServiceParent()
425 # now that the introducer server is offline, create a client and
426 # publish some messages
427 c.setServiceParent(self.parent) # this starts the reconnector
428 c.publish("storage", make_ann(furl1), sk)
430 introducer.setServiceParent(self.parent) # restart the server
431 # now wait for the messages to be delivered
432 def _got_announcement():
433 return bool(introducer.get_announcements())
434 return self.poll(_got_announcement)
435 d.addCallback(_offline)
437 v = introducer.get_announcements()[0]
438 furl = v.announcement["anonymous-storage-FURL"]
439 self.failUnlessEqual(furl, furl1)
442 # now let the ack get back
443 def _wait_until_idle(ign):
445 if c._debug_outstanding:
447 if introducer._debug_outstanding:
450 return self.poll(_idle)
451 d.addCallback(_wait_until_idle)
456 class SystemTest(SystemTestMixin, unittest.TestCase):
458 def do_system_test(self, server_version):
460 if server_version == V1:
461 introducer = old.IntroducerService_v1()
463 introducer = IntroducerService()
464 introducer.setServiceParent(self.parent)
465 iff = os.path.join(self.basedir, "introducer.furl")
466 tub = self.central_tub
467 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
468 self.introducer_furl = ifurl
470 # we have 5 clients who publish themselves as storage servers, and a
471 # sixth which does which not. All 6 clients subscriber to hear about
472 # storage. When the connections are fully established, all six nodes
473 # should have 5 connections each.
479 received_announcements = {}
480 subscribing_clients = []
481 publishing_clients = []
482 printable_serverids = {}
483 self.the_introducer = introducer
485 expected_announcements = [0 for c in range(NUM_CLIENTS)]
487 for i in range(NUM_CLIENTS):
489 #tub.setOption("logLocalFailures", True)
490 #tub.setOption("logRemoteFailures", True)
491 tub.setOption("expose-remote-exception-types", False)
492 tub.setServiceParent(self.parent)
493 l = tub.listenOn("tcp:0")
494 portnum = l.getPortnum()
495 tub.setLocation("localhost:%d" % portnum)
497 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
499 c = old.IntroducerClient_v1(tub, self.introducer_furl,
503 c = IntroducerClient(tub, self.introducer_furl,
506 {"component": "component-v1"})
507 received_announcements[c] = {}
508 def got(key_s_or_tubid, ann, announcements, i):
510 index = get_tubid_string_from_ann(ann)
512 index = key_s_or_tubid or get_tubid_string_from_ann(ann)
513 announcements[index] = ann
514 c.subscribe_to("storage", got, received_announcements[c], i)
515 subscribing_clients.append(c)
516 expected_announcements[i] += 1 # all expect a 'storage' announcement
518 node_furl = tub.registerReference(Referenceable())
521 c.publish(node_furl, "storage", "ri_name")
522 printable_serverids[i] = get_tubid_string(node_furl)
524 # sign the announcement
525 privkey_s, pubkey_s = keyutil.make_keypair()
526 privkey, _ignored = keyutil.parse_privkey(privkey_s)
527 privkeys[c] = privkey
528 c.publish("storage", make_ann(node_furl), privkey)
529 if server_version == V1:
530 printable_serverids[i] = get_tubid_string(node_furl)
532 assert pubkey_s.startswith("pub-")
533 printable_serverids[i] = pubkey_s[len("pub-"):]
535 c.publish("storage", make_ann(node_furl))
536 printable_serverids[i] = get_tubid_string(node_furl)
537 publishing_clients.append(c)
539 # the last one does not publish anything
543 # users of the V1 client were required to publish a
544 # 'stub_client' record (somewhat after they published the
545 # 'storage' record), so the introducer could see their
546 # version. Match that behavior.
547 c.publish(node_furl, "stub_client", "stub_ri_name")
550 # also publish something that nobody cares about
551 boring_furl = tub.registerReference(Referenceable())
552 c.publish("boring", make_ann(boring_furl))
554 c.setServiceParent(self.parent)
559 def _wait_for_connected(ign):
562 if not c.connected_to_introducer():
565 return self.poll(_connected)
567 # we watch the clients to determine when the system has settled down.
568 # Then we can look inside the server to assert things about its
571 def _wait_for_expected_announcements(ign):
572 def _got_expected_announcements():
573 for i,c in enumerate(subscribing_clients):
574 if len(received_announcements[c]) < expected_announcements[i]:
577 return self.poll(_got_expected_announcements)
579 # before shutting down any Tub, we'd like to know that there are no
580 # messages outstanding
582 def _wait_until_idle(ign):
584 for c in subscribing_clients + publishing_clients:
585 if c._debug_outstanding:
587 if self.the_introducer._debug_outstanding:
590 return self.poll(_idle)
592 d = defer.succeed(None)
593 d.addCallback(_wait_for_connected)
594 d.addCallback(_wait_for_expected_announcements)
595 d.addCallback(_wait_until_idle)
598 log.msg("doing _check1")
599 dc = self.the_introducer._debug_counts
600 if server_version == V1:
601 # each storage server publishes a record, and (after its
602 # 'subscribe' has been ACKed) also publishes a "stub_client".
603 # The non-storage client (which subscribes) also publishes a
604 # stub_client. There is also one "boring" service. The number
605 # of messages is higher, because the stub_clients aren't
606 # published until after we get the 'subscribe' ack (since we
607 # don't realize that we're dealing with a v1 server [which
608 # needs stub_clients] until then), and the act of publishing
609 # the stub_client causes us to re-send all previous
611 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
612 NUM_STORAGE + NUM_CLIENTS + 1)
614 # each storage server publishes a record. There is also one
615 # "stub_client" and one "boring"
616 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
617 self.failUnlessEqual(dc["inbound_duplicate"], 0)
618 self.failUnlessEqual(dc["inbound_update"], 0)
619 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
620 # the number of outbound messages is tricky.. I think it depends
621 # upon a race between the publish and the subscribe messages.
622 self.failUnless(dc["outbound_message"] > 0)
623 # each client subscribes to "storage", and each server publishes
624 self.failUnlessEqual(dc["outbound_announcements"],
625 NUM_STORAGE*NUM_CLIENTS)
627 for c in subscribing_clients:
628 cdc = c._debug_counts
629 self.failUnless(cdc["inbound_message"])
630 self.failUnlessEqual(cdc["inbound_announcement"],
632 self.failUnlessEqual(cdc["wrong_service"], 0)
633 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
634 self.failUnlessEqual(cdc["update"], 0)
635 self.failUnlessEqual(cdc["new_announcement"],
637 anns = received_announcements[c]
638 self.failUnlessEqual(len(anns), NUM_STORAGE)
640 nodeid0 = tubs[clients[0]].tubID
642 nick = ann["nickname"]
643 self.failUnlessEqual(type(nick), unicode)
644 self.failUnlessEqual(nick, NICKNAME % "0")
645 if server_version == V1:
646 for c in publishing_clients:
647 cdc = c._debug_counts
648 expected = 1 # storage
650 expected += 1 # boring
651 if c is not clients[0]:
652 # the v2 client tries to call publish_v2, which fails
653 # because the server is v1. It then re-sends
654 # everything it has so far, plus a stub_client record
655 expected = 2*expected + 1
657 # we always tell v1 client to send stub_client
659 self.failUnlessEqual(cdc["outbound_message"], expected)
661 for c in publishing_clients:
662 cdc = c._debug_counts
664 if c in [clients[0], # stub_client
668 self.failUnlessEqual(cdc["outbound_message"], expected)
669 # now check the web status, make sure it renders without error
670 ir = introweb.IntroducerRoot(self.parent)
671 self.parent.nodeid = "NODEID"
672 text = ir.renderSynchronously().decode("utf-8")
673 self.failUnlessIn(NICKNAME % "0", text) # the v1 client
674 self.failUnlessIn(NICKNAME % "1", text) # a v2 client
675 for i in range(NUM_STORAGE):
676 self.failUnlessIn(printable_serverids[i], text,
677 (i,printable_serverids[i],text))
678 # make sure there isn't a double-base32ed string too
679 self.failIfIn(idlib.nodeid_b2a(printable_serverids[i]), text,
680 (i,printable_serverids[i],text))
681 log.msg("_check1 done")
682 d.addCallback(_check1)
684 # force an introducer reconnect, by shutting down the Tub it's using
685 # and starting a new Tub (with the old introducer). Everybody should
686 # reconnect and republish, but the introducer should ignore the
687 # republishes as duplicates. However, because the server doesn't know
688 # what each client does and does not know, it will send them a copy
689 # of the current announcement table anyway.
691 d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
692 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
694 def _wait_for_introducer_loss(ign):
695 def _introducer_lost():
697 if c.connected_to_introducer():
700 return self.poll(_introducer_lost)
701 d.addCallback(_wait_for_introducer_loss)
703 def _restart_introducer_tub(_ign):
704 log.msg("restarting introducer's Tub")
706 for i in range(NUM_CLIENTS):
707 c = subscribing_clients[i]
708 for k in c._debug_counts:
709 c._debug_counts[k] = 0
710 for k in self.the_introducer._debug_counts:
711 self.the_introducer._debug_counts[k] = 0
712 expected_announcements[i] += 1 # new 'storage' for everyone
713 self.create_tub(self.central_portnum)
714 newfurl = self.central_tub.registerReference(self.the_introducer,
716 assert newfurl == self.introducer_furl
717 d.addCallback(_restart_introducer_tub)
719 d.addCallback(_wait_for_connected)
720 d.addCallback(_wait_for_expected_announcements)
721 d.addCallback(_wait_until_idle)
722 d.addCallback(lambda _ign: log.msg(" reconnected"))
724 # TODO: publish something while the introducer is offline, then
725 # confirm it gets delivered when the connection is reestablished
727 log.msg("doing _check2")
728 # assert that the introducer sent out new messages, one per
730 dc = self.the_introducer._debug_counts
731 self.failUnlessEqual(dc["outbound_announcements"],
732 NUM_STORAGE*NUM_CLIENTS)
733 self.failUnless(dc["outbound_message"] > 0)
734 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
735 for c in subscribing_clients:
736 cdc = c._debug_counts
737 self.failUnlessEqual(cdc["inbound_message"], 1)
738 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
739 self.failUnlessEqual(cdc["new_announcement"], 0)
740 self.failUnlessEqual(cdc["wrong_service"], 0)
741 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
742 d.addCallback(_check2)
744 # Then force an introducer restart, by shutting down the Tub,
745 # destroying the old introducer, and starting a new Tub+Introducer.
746 # Everybody should reconnect and republish, and the (new) introducer
747 # will distribute the new announcements, but the clients should
748 # ignore the republishes as duplicates.
750 d.addCallback(lambda _ign: log.msg("shutting down introducer"))
751 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
752 d.addCallback(_wait_for_introducer_loss)
753 d.addCallback(lambda _ign: log.msg("introducer lost"))
755 def _restart_introducer(_ign):
756 log.msg("restarting introducer")
757 self.create_tub(self.central_portnum)
759 for i in range(NUM_CLIENTS):
760 c = subscribing_clients[i]
761 for k in c._debug_counts:
762 c._debug_counts[k] = 0
763 expected_announcements[i] += 1 # new 'storage' for everyone
764 if server_version == V1:
765 introducer = old.IntroducerService_v1()
767 introducer = IntroducerService()
768 self.the_introducer = introducer
769 newfurl = self.central_tub.registerReference(self.the_introducer,
771 assert newfurl == self.introducer_furl
772 d.addCallback(_restart_introducer)
774 d.addCallback(_wait_for_connected)
775 d.addCallback(_wait_for_expected_announcements)
776 d.addCallback(_wait_until_idle)
779 log.msg("doing _check3")
780 dc = self.the_introducer._debug_counts
781 self.failUnlessEqual(dc["outbound_announcements"],
782 NUM_STORAGE*NUM_CLIENTS)
783 self.failUnless(dc["outbound_message"] > 0)
784 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
785 for c in subscribing_clients:
786 cdc = c._debug_counts
787 self.failUnless(cdc["inbound_message"] > 0)
788 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
789 self.failUnlessEqual(cdc["new_announcement"], 0)
790 self.failUnlessEqual(cdc["wrong_service"], 0)
791 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
793 d.addCallback(_check3)
797 def test_system_v2_server(self):
798 self.basedir = "introducer/SystemTest/system_v2_server"
799 os.makedirs(self.basedir)
800 return self.do_system_test(V2)
801 test_system_v2_server.timeout = 480
802 # occasionally takes longer than 350s on "draco"
804 def test_system_v1_server(self):
805 self.basedir = "introducer/SystemTest/system_v1_server"
806 os.makedirs(self.basedir)
807 return self.do_system_test(V1)
808 test_system_v1_server.timeout = 480
809 # occasionally takes longer than 350s on "draco"
811 class FakeRemoteReference:
812 def notifyOnDisconnect(self, *args, **kwargs): pass
813 def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
814 def getLocationHints(self): return [("ipv4", "here.example.com", "1234"),
815 ("ipv4", "there.example.com", "2345")]
816 def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
819 class ClientInfo(unittest.TestCase):
820 def test_client_v2(self):
821 introducer = IntroducerService()
822 tub = introducer_furl = None
823 app_versions = {"whizzy": "fizzy"}
824 client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
825 "my_version", "oldest", app_versions)
826 #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
827 #ann_s = make_ann_t(client_v2, furl1, None, 10)
828 #introducer.remote_publish_v2(ann_s, Referenceable())
829 subscriber = FakeRemoteReference()
830 introducer.remote_subscribe_v2(subscriber, "storage",
831 client_v2._my_subscriber_info)
832 subs = introducer.get_subscribers()
833 self.failUnlessEqual(len(subs), 1)
835 self.failUnlessEqual(s0.service_name, "storage")
836 self.failUnlessEqual(s0.app_versions, app_versions)
837 self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
838 self.failUnlessEqual(s0.version, "my_version")
840 def test_client_v1(self):
841 introducer = IntroducerService()
842 subscriber = FakeRemoteReference()
843 introducer.remote_subscribe(subscriber, "storage")
844 # the v1 subscribe interface had no subscriber_info: that was usually
845 # sent in a separate stub_client pseudo-announcement
846 subs = introducer.get_subscribers()
847 self.failUnlessEqual(len(subs), 1)
849 self.failUnlessEqual(s0.nickname, u"?") # not known yet
850 self.failUnlessEqual(s0.service_name, "storage")
852 # now submit the stub_client announcement
853 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
854 ann = (furl1, "stub_client", "RIStubClient",
855 (NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
856 introducer.remote_publish(ann)
857 # the server should correlate the two
858 subs = introducer.get_subscribers()
859 self.failUnlessEqual(len(subs), 1)
861 self.failUnlessEqual(s0.service_name, "storage")
862 # v1 announcements do not contain app-versions
863 self.failUnlessEqual(s0.app_versions, {})
864 self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
865 self.failUnlessEqual(s0.version, "my_version")
867 # a subscription that arrives after the stub_client announcement
868 # should be correlated too
869 subscriber2 = FakeRemoteReference()
870 introducer.remote_subscribe(subscriber2, "thing2")
872 subs = introducer.get_subscribers()
873 self.failUnlessEqual(len(subs), 2)
874 s0 = [s for s in subs if s.service_name == "thing2"][0]
875 # v1 announcements do not contain app-versions
876 self.failUnlessEqual(s0.app_versions, {})
877 self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
878 self.failUnlessEqual(s0.version, "my_version")
880 class Announcements(unittest.TestCase):
881 def test_client_v2_unsigned(self):
882 introducer = IntroducerService()
883 tub = introducer_furl = None
884 app_versions = {"whizzy": "fizzy"}
885 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
886 "my_version", "oldest", app_versions)
887 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
888 tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
889 ann_s0 = make_ann_t(client_v2, furl1, None, 10.0)
890 canary0 = Referenceable()
891 introducer.remote_publish_v2(ann_s0, canary0)
892 a = introducer.get_announcements()
893 self.failUnlessEqual(len(a), 1)
894 self.failUnlessIdentical(a[0].canary, canary0)
895 self.failUnlessEqual(a[0].index, ("storage", None, tubid))
896 self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
897 self.failUnlessEqual(a[0].nickname, u"nick-v2")
898 self.failUnlessEqual(a[0].service_name, "storage")
899 self.failUnlessEqual(a[0].version, "my_version")
900 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
902 def test_client_v2_signed(self):
903 introducer = IntroducerService()
904 tub = introducer_furl = None
905 app_versions = {"whizzy": "fizzy"}
906 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
907 "my_version", "oldest", app_versions)
908 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
909 sk_s, vk_s = keyutil.make_keypair()
910 sk, _ignored = keyutil.parse_privkey(sk_s)
911 pks = keyutil.remove_prefix(vk_s, "pub-")
912 ann_t0 = make_ann_t(client_v2, furl1, sk, 10.0)
913 canary0 = Referenceable()
914 introducer.remote_publish_v2(ann_t0, canary0)
915 a = introducer.get_announcements()
916 self.failUnlessEqual(len(a), 1)
917 self.failUnlessIdentical(a[0].canary, canary0)
918 self.failUnlessEqual(a[0].index, ("storage", pks, None))
919 self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
920 self.failUnlessEqual(a[0].nickname, u"nick-v2")
921 self.failUnlessEqual(a[0].service_name, "storage")
922 self.failUnlessEqual(a[0].version, "my_version")
923 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
925 def test_client_v1(self):
926 introducer = IntroducerService()
928 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
929 tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
930 ann = (furl1, "storage", "RIStorage",
931 u"nick-v1".encode("utf-8"), "my_version", "oldest")
932 introducer.remote_publish(ann)
934 a = introducer.get_announcements()
935 self.failUnlessEqual(len(a), 1)
936 self.failUnlessEqual(a[0].index, ("storage", None, tubid))
937 self.failUnlessEqual(a[0].canary, None)
938 self.failUnlessEqual(a[0].announcement["app-versions"], {})
939 self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
940 self.failUnlessEqual(a[0].service_name, "storage")
941 self.failUnlessEqual(a[0].version, "my_version")
942 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
945 class TooNewServer(IntroducerService):
946 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
948 "application-version": "greetings from the crazy future",
951 class NonV1Server(SystemTestMixin, unittest.TestCase):
952 # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
953 # protocol, it is supposed to provide a useful error instead of a weird
956 def test_failure(self):
957 self.basedir = "introducer/NonV1Server/failure"
958 os.makedirs(self.basedir)
961 i.setServiceParent(self.parent)
962 self.introducer_furl = self.central_tub.registerReference(i)
965 tub.setOption("expose-remote-exception-types", False)
966 tub.setServiceParent(self.parent)
967 l = tub.listenOn("tcp:0")
968 portnum = l.getPortnum()
969 tub.setLocation("localhost:%d" % portnum)
971 c = IntroducerClient(tub, self.introducer_furl,
972 u"nickname-client", "version", "oldest", {})
975 announcements[key_s] = ann
976 c.subscribe_to("storage", got)
978 c.setServiceParent(self.parent)
980 # now we wait for it to connect and notice the bad version
983 return bool(c._introducer_error) or bool(c._publisher)
984 d = self.poll(_got_bad)
986 self.failUnless(c._introducer_error)
987 self.failUnless(c._introducer_error.check(InsufficientVersionError),
992 class DecodeFurl(unittest.TestCase):
993 def test_decode(self):
994 # make sure we have a working base64.b32decode. The one in
995 # python2.4.[01] was broken.
996 furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
997 m = re.match(r'pb://(\w+)@', furl)
999 nodeid = b32decode(m.group(1).upper())
1000 self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
1002 class Signatures(unittest.TestCase):
1003 def test_sign(self):
1004 ann = {"key1": "value1"}
1005 sk_s,vk_s = keyutil.make_keypair()
1006 sk,ignored = keyutil.parse_privkey(sk_s)
1007 ann_t = sign_to_foolscap(ann, sk)
1008 (msg, sig, key) = ann_t
1009 self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
1010 self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
1011 self.failUnless(sig.startswith("v0-"))
1012 self.failUnless(key.startswith("v0-"))
1013 (ann2,key2) = unsign_from_foolscap(ann_t)
1014 self.failUnlessEqual(ann2, ann)
1015 self.failUnlessEqual("pub-"+key2, vk_s)
1018 bad_ann = {"key1": "value2"}
1019 bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
1020 self.failUnlessRaises(keyutil.BadSignatureError,
1021 unsign_from_foolscap, (bad_msg,sig,key))
1022 # sneaky bad signature should be ignored
1023 (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
1024 self.failUnlessEqual(key2, None)
1025 self.failUnlessEqual(ann2, bad_ann)
1027 # unrecognized signatures
1028 self.failUnlessRaises(UnknownKeyError,
1029 unsign_from_foolscap, (bad_msg,"v999-sig",key))
1030 self.failUnlessRaises(UnknownKeyError,
1031 unsign_from_foolscap, (bad_msg,sig,"v999-key"))
1034 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
1035 # should leave the Reconnector in place, also if it receives
1036 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
1037 # should tear down the Reconnector and make a new one. This behavior used to
1038 # live in the IntroducerClient, and thus used to be tested by test_introducer
1040 # copying more tests from old branch:
1042 # then also add Upgrade test