2 import os, re, itertools
3 from base64 import b32decode
6 from twisted.trial import unittest
7 from twisted.internet import defer, address
8 from twisted.python import log
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14 WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17 get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.web import introweb
23 from allmydata.client import Client as TahoeClient
24 from allmydata.util import pollmixin, keyutil, idlib, fileutil
25 import allmydata.test.common_util as testutil
27 class LoggingMultiService(service.MultiService):
28 def log(self, msg, **kw):
31 class Node(testutil.SignalMixin, unittest.TestCase):
32 def test_loadable(self):
33 basedir = "introducer.IntroducerNode.test_loadable"
35 q = IntroducerNode(basedir)
36 d = fireEventually(None)
37 d.addCallback(lambda res: q.startService())
38 d.addCallback(lambda res: q.when_tub_ready())
39 d.addCallback(lambda res: q.stopService())
40 d.addCallback(flushEventualQueue)
45 self.parent = LoggingMultiService()
46 self.parent.startService()
48 log.msg("TestIntroducer.tearDown")
49 d = defer.succeed(None)
50 d.addCallback(lambda res: self.parent.stopService())
51 d.addCallback(flushEventualQueue)
54 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
56 def test_create(self):
57 ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
58 "my_version", "oldest_version", {}, fakeseq)
59 self.failUnless(isinstance(ic, IntroducerClient))
61 def test_listen(self):
62 i = IntroducerService()
63 i.setServiceParent(self.parent)
65 def test_duplicate_publish(self):
66 i = IntroducerService()
67 self.failUnlessEqual(len(i.get_announcements()), 0)
68 self.failUnlessEqual(len(i.get_subscribers()), 0)
69 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
70 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
71 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
72 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
73 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
74 i.remote_publish(ann1)
75 self.failUnlessEqual(len(i.get_announcements()), 1)
76 self.failUnlessEqual(len(i.get_subscribers()), 0)
77 i.remote_publish(ann2)
78 self.failUnlessEqual(len(i.get_announcements()), 2)
79 self.failUnlessEqual(len(i.get_subscribers()), 0)
80 i.remote_publish(ann1b)
81 self.failUnlessEqual(len(i.get_announcements()), 2)
82 self.failUnlessEqual(len(i.get_subscribers()), 0)
84 def test_id_collision(self):
85 # test replacement case where tubid equals a keyid (one should
86 # not replace the other)
87 i = IntroducerService()
88 ic = IntroducerClient(None,
89 "introducer.furl", u"my_nickname",
90 "my_version", "oldest_version", {}, fakeseq)
91 sk_s, vk_s = keyutil.make_keypair()
92 sk, _ignored = keyutil.parse_privkey(sk_s)
93 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
94 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
95 ann_t = make_ann_t(ic, furl1, sk, 1)
96 i.remote_publish_v2(ann_t, Referenceable())
97 announcements = i.get_announcements()
98 self.failUnlessEqual(len(announcements), 1)
99 key1 = ("storage", "v0-"+keyid, None)
100 self.failUnlessEqual(announcements[0].index, key1)
101 ann1_out = announcements[0].announcement
102 self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
104 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
105 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
106 i.remote_publish(ann2)
107 announcements = i.get_announcements()
108 self.failUnlessEqual(len(announcements), 2)
109 key2 = ("storage", None, keyid)
110 wanted = [ad for ad in announcements if ad.index == key2]
111 self.failUnlessEqual(len(wanted), 1)
112 ann2_out = wanted[0].announcement
113 self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
119 seqnum_counter = itertools.count(1)
121 return seqnum_counter.next(), str(os.randint(1,100000))
124 ann = { "anonymous-storage-FURL": furl,
125 "permutation-seed-base32": get_tubid_string(furl) }
128 def make_ann_t(ic, furl, privkey, seqnum):
129 ann_d = ic.create_announcement_dict("storage", make_ann(furl))
130 ann_d["seqnum"] = seqnum
131 ann_d["nonce"] = "nonce"
132 ann_t = sign_to_foolscap(ann_d, privkey)
135 class Client(unittest.TestCase):
136 def test_duplicate_receive_v1(self):
137 ic = IntroducerClient(None,
138 "introducer.furl", u"my_nickname",
139 "my_version", "oldest_version", {}, fakeseq)
141 ic.subscribe_to("storage",
142 lambda key_s,ann: announcements.append(ann))
143 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
144 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
145 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
146 ca = WrapV2ClientInV1Interface(ic)
148 ca.remote_announce([ann1])
151 self.failUnlessEqual(len(announcements), 1)
152 self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
153 self.failUnlessEqual(announcements[0]["my-version"], "ver23")
154 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
155 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
156 self.failUnlessEqual(ic._debug_counts["update"], 0)
157 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
158 # now send a duplicate announcement: this should not notify clients
159 ca.remote_announce([ann1])
160 return fireEventually()
163 self.failUnlessEqual(len(announcements), 1)
164 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
165 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
166 self.failUnlessEqual(ic._debug_counts["update"], 0)
167 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
168 # and a replacement announcement: same FURL, new other stuff.
169 # Clients should be notified.
170 ca.remote_announce([ann1b])
171 return fireEventually()
172 d.addCallback(_then2)
174 self.failUnlessEqual(len(announcements), 2)
175 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
176 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
177 self.failUnlessEqual(ic._debug_counts["update"], 1)
178 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
179 # test that the other stuff changed
180 self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
181 self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
182 d.addCallback(_then3)
185 def test_duplicate_receive_v2(self):
186 ic1 = IntroducerClient(None,
187 "introducer.furl", u"my_nickname",
188 "ver23", "oldest_version", {}, fakeseq)
189 # we use a second client just to create a different-looking
191 ic2 = IntroducerClient(None,
192 "introducer.furl", u"my_nickname",
193 "ver24","oldest_version",{}, fakeseq)
195 def _received(key_s, ann):
196 announcements.append( (key_s, ann) )
197 ic1.subscribe_to("storage", _received)
198 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
199 furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
200 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
202 privkey_s, pubkey_vs = keyutil.make_keypair()
203 privkey, _ignored = keyutil.parse_privkey(privkey_s)
204 pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
207 # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
211 self.ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
212 self.ann1old = make_ann_t(ic1, furl1, privkey, seqnum=9)
213 self.ann1noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
214 self.ann1b = make_ann_t(ic2, furl1, privkey, seqnum=11)
215 self.ann1a = make_ann_t(ic1, furl1a, privkey, seqnum=12)
216 self.ann2 = make_ann_t(ic2, furl2, privkey, seqnum=13)
218 ic1.remote_announce_v2([self.ann1]) # queues eventual-send
221 self.failUnlessEqual(len(announcements), 1)
222 key_s,ann = announcements[0]
223 self.failUnlessEqual(key_s, pubkey_s)
224 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
225 self.failUnlessEqual(ann["my-version"], "ver23")
226 d.addCallback(_then1)
228 # now send a duplicate announcement. This should not fire the
230 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
231 d.addCallback(fireEventually)
233 self.failUnlessEqual(len(announcements), 1)
234 d.addCallback(_then2)
236 # an older announcement shouldn't fire the subscriber either
237 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1old]))
238 d.addCallback(fireEventually)
240 self.failUnlessEqual(len(announcements), 1)
241 d.addCallback(_then2a)
243 # announcement with no seqnum cannot replace one with-seqnum
244 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1noseqnum]))
245 d.addCallback(fireEventually)
247 self.failUnlessEqual(len(announcements), 1)
248 d.addCallback(_then2b)
250 # and a replacement announcement: same FURL, new other stuff. The
251 # subscriber *should* be fired.
252 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
253 d.addCallback(fireEventually)
255 self.failUnlessEqual(len(announcements), 2)
256 key_s,ann = announcements[-1]
257 self.failUnlessEqual(key_s, pubkey_s)
258 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
259 self.failUnlessEqual(ann["my-version"], "ver24")
260 d.addCallback(_then3)
262 # and a replacement announcement with a different FURL (it uses
263 # different connection hints)
264 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
265 d.addCallback(fireEventually)
267 self.failUnlessEqual(len(announcements), 3)
268 key_s,ann = announcements[-1]
269 self.failUnlessEqual(key_s, pubkey_s)
270 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
271 self.failUnlessEqual(ann["my-version"], "ver23")
272 d.addCallback(_then4)
274 # now add a new subscription, which should be called with the
275 # backlog. The introducer only records one announcement per index, so
276 # the backlog will only have the latest message.
278 def _received2(key_s, ann):
279 announcements2.append( (key_s, ann) )
280 d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
281 d.addCallback(fireEventually)
283 self.failUnlessEqual(len(announcements2), 1)
284 key_s,ann = announcements2[-1]
285 self.failUnlessEqual(key_s, pubkey_s)
286 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
287 self.failUnlessEqual(ann["my-version"], "ver23")
288 d.addCallback(_then5)
291 def test_id_collision(self):
292 # test replacement case where tubid equals a keyid (one should
293 # not replace the other)
294 ic = IntroducerClient(None,
295 "introducer.furl", u"my_nickname",
296 "my_version", "oldest_version", {}, fakeseq)
298 ic.subscribe_to("storage",
299 lambda key_s,ann: announcements.append(ann))
300 sk_s, vk_s = keyutil.make_keypair()
301 sk, _ignored = keyutil.parse_privkey(sk_s)
302 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
303 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
304 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
305 ann_t = make_ann_t(ic, furl1, sk, 1)
306 ic.remote_announce_v2([ann_t])
309 # first announcement has been processed
310 self.failUnlessEqual(len(announcements), 1)
311 self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
313 # now submit a second one, with a tubid that happens to look just
314 # like the pubkey-based serverid we just processed. They should
316 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
317 ca = WrapV2ClientInV1Interface(ic)
318 ca.remote_announce([ann2])
319 return fireEventually()
322 # if they overlapped, the second announcement would be ignored
323 self.failUnlessEqual(len(announcements), 2)
324 self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
326 d.addCallback(_then2)
329 class Server(unittest.TestCase):
330 def test_duplicate(self):
331 i = IntroducerService()
332 ic1 = IntroducerClient(None,
333 "introducer.furl", u"my_nickname",
334 "ver23", "oldest_version", {}, realseq)
335 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
337 privkey_s, _ = keyutil.make_keypair()
338 privkey, _ = keyutil.parse_privkey(privkey_s)
340 ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
341 ann1_old = make_ann_t(ic1, furl1, privkey, seqnum=9)
342 ann1_new = make_ann_t(ic1, furl1, privkey, seqnum=11)
343 ann1_noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
344 ann1_badseqnum = make_ann_t(ic1, furl1, privkey, seqnum="not an int")
346 i.remote_publish_v2(ann1, None)
347 all = i.get_announcements()
348 self.failUnlessEqual(len(all), 1)
349 self.failUnlessEqual(all[0].announcement["seqnum"], 10)
350 self.failUnlessEqual(i._debug_counts["inbound_message"], 1)
351 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 0)
352 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
353 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
354 self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
356 i.remote_publish_v2(ann1, None)
357 all = i.get_announcements()
358 self.failUnlessEqual(len(all), 1)
359 self.failUnlessEqual(all[0].announcement["seqnum"], 10)
360 self.failUnlessEqual(i._debug_counts["inbound_message"], 2)
361 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
362 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
363 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
364 self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
366 i.remote_publish_v2(ann1_old, None)
367 all = i.get_announcements()
368 self.failUnlessEqual(len(all), 1)
369 self.failUnlessEqual(all[0].announcement["seqnum"], 10)
370 self.failUnlessEqual(i._debug_counts["inbound_message"], 3)
371 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
372 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
373 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
374 self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
376 i.remote_publish_v2(ann1_new, None)
377 all = i.get_announcements()
378 self.failUnlessEqual(len(all), 1)
379 self.failUnlessEqual(all[0].announcement["seqnum"], 11)
380 self.failUnlessEqual(i._debug_counts["inbound_message"], 4)
381 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
382 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
383 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
384 self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
386 i.remote_publish_v2(ann1_noseqnum, None)
387 all = i.get_announcements()
388 self.failUnlessEqual(len(all), 1)
389 self.failUnlessEqual(all[0].announcement["seqnum"], 11)
390 self.failUnlessEqual(i._debug_counts["inbound_message"], 5)
391 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
392 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 1)
393 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
394 self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
396 i.remote_publish_v2(ann1_badseqnum, None)
397 all = i.get_announcements()
398 self.failUnlessEqual(len(all), 1)
399 self.failUnlessEqual(all[0].announcement["seqnum"], 11)
400 self.failUnlessEqual(i._debug_counts["inbound_message"], 6)
401 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
402 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 2)
403 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
404 self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
407 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
409 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
411 def create_tub(self, portnum=0):
412 tubfile = os.path.join(self.basedir, "tub.pem")
413 self.central_tub = tub = Tub(certFile=tubfile)
414 #tub.setOption("logLocalFailures", True)
415 #tub.setOption("logRemoteFailures", True)
416 tub.setOption("expose-remote-exception-types", False)
417 tub.setServiceParent(self.parent)
418 l = tub.listenOn("tcp:%d" % portnum)
419 self.central_portnum = l.getPortnum()
421 assert self.central_portnum == portnum
422 tub.setLocation("localhost:%d" % self.central_portnum)
424 class Queue(SystemTestMixin, unittest.TestCase):
425 def test_queue_until_connected(self):
426 self.basedir = "introducer/QueueUntilConnected/queued"
427 os.makedirs(self.basedir)
429 introducer = IntroducerService()
430 introducer.setServiceParent(self.parent)
431 iff = os.path.join(self.basedir, "introducer.furl")
432 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
434 tub2.setServiceParent(self.parent)
435 c = IntroducerClient(tub2, ifurl,
436 u"nickname", "version", "oldest", {}, fakeseq)
437 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
438 sk_s, vk_s = keyutil.make_keypair()
439 sk, _ignored = keyutil.parse_privkey(sk_s)
441 d = introducer.disownServiceParent()
443 # now that the introducer server is offline, create a client and
444 # publish some messages
445 c.setServiceParent(self.parent) # this starts the reconnector
446 c.publish("storage", make_ann(furl1), sk)
448 introducer.setServiceParent(self.parent) # restart the server
449 # now wait for the messages to be delivered
450 def _got_announcement():
451 return bool(introducer.get_announcements())
452 return self.poll(_got_announcement)
453 d.addCallback(_offline)
455 v = introducer.get_announcements()[0]
456 furl = v.announcement["anonymous-storage-FURL"]
457 self.failUnlessEqual(furl, furl1)
460 # now let the ack get back
461 def _wait_until_idle(ign):
463 if c._debug_outstanding:
465 if introducer._debug_outstanding:
468 return self.poll(_idle)
469 d.addCallback(_wait_until_idle)
474 class SystemTest(SystemTestMixin, unittest.TestCase):
476 def do_system_test(self, server_version):
478 if server_version == V1:
479 introducer = old.IntroducerService_v1()
481 introducer = IntroducerService()
482 introducer.setServiceParent(self.parent)
483 iff = os.path.join(self.basedir, "introducer.furl")
484 tub = self.central_tub
485 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
486 self.introducer_furl = ifurl
488 # we have 5 clients who publish themselves as storage servers, and a
489 # sixth which does which not. All 6 clients subscriber to hear about
490 # storage. When the connections are fully established, all six nodes
491 # should have 5 connections each.
497 received_announcements = {}
498 subscribing_clients = []
499 publishing_clients = []
500 printable_serverids = {}
501 self.the_introducer = introducer
503 expected_announcements = [0 for c in range(NUM_CLIENTS)]
505 for i in range(NUM_CLIENTS):
507 #tub.setOption("logLocalFailures", True)
508 #tub.setOption("logRemoteFailures", True)
509 tub.setOption("expose-remote-exception-types", False)
510 tub.setServiceParent(self.parent)
511 l = tub.listenOn("tcp:0")
512 portnum = l.getPortnum()
513 tub.setLocation("localhost:%d" % portnum)
515 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
517 c = old.IntroducerClient_v1(tub, self.introducer_furl,
521 c = IntroducerClient(tub, self.introducer_furl,
524 {"component": "component-v1"}, fakeseq)
525 received_announcements[c] = {}
526 def got(key_s_or_tubid, ann, announcements, i):
528 index = get_tubid_string_from_ann(ann)
530 index = key_s_or_tubid or get_tubid_string_from_ann(ann)
531 announcements[index] = ann
532 c.subscribe_to("storage", got, received_announcements[c], i)
533 subscribing_clients.append(c)
534 expected_announcements[i] += 1 # all expect a 'storage' announcement
536 node_furl = tub.registerReference(Referenceable())
539 c.publish(node_furl, "storage", "ri_name")
540 printable_serverids[i] = get_tubid_string(node_furl)
542 # sign the announcement
543 privkey_s, pubkey_s = keyutil.make_keypair()
544 privkey, _ignored = keyutil.parse_privkey(privkey_s)
545 privkeys[c] = privkey
546 c.publish("storage", make_ann(node_furl), privkey)
547 if server_version == V1:
548 printable_serverids[i] = get_tubid_string(node_furl)
550 assert pubkey_s.startswith("pub-")
551 printable_serverids[i] = pubkey_s[len("pub-"):]
553 c.publish("storage", make_ann(node_furl))
554 printable_serverids[i] = get_tubid_string(node_furl)
555 publishing_clients.append(c)
557 # the last one does not publish anything
561 # users of the V1 client were required to publish a
562 # 'stub_client' record (somewhat after they published the
563 # 'storage' record), so the introducer could see their
564 # version. Match that behavior.
565 c.publish(node_furl, "stub_client", "stub_ri_name")
568 # also publish something that nobody cares about
569 boring_furl = tub.registerReference(Referenceable())
570 c.publish("boring", make_ann(boring_furl))
572 c.setServiceParent(self.parent)
577 def _wait_for_connected(ign):
580 if not c.connected_to_introducer():
583 return self.poll(_connected)
585 # we watch the clients to determine when the system has settled down.
586 # Then we can look inside the server to assert things about its
589 def _wait_for_expected_announcements(ign):
590 def _got_expected_announcements():
591 for i,c in enumerate(subscribing_clients):
592 if len(received_announcements[c]) < expected_announcements[i]:
595 return self.poll(_got_expected_announcements)
597 # before shutting down any Tub, we'd like to know that there are no
598 # messages outstanding
600 def _wait_until_idle(ign):
602 for c in subscribing_clients + publishing_clients:
603 if c._debug_outstanding:
605 if self.the_introducer._debug_outstanding:
608 return self.poll(_idle)
610 d = defer.succeed(None)
611 d.addCallback(_wait_for_connected)
612 d.addCallback(_wait_for_expected_announcements)
613 d.addCallback(_wait_until_idle)
616 log.msg("doing _check1")
617 dc = self.the_introducer._debug_counts
618 if server_version == V1:
619 # each storage server publishes a record, and (after its
620 # 'subscribe' has been ACKed) also publishes a "stub_client".
621 # The non-storage client (which subscribes) also publishes a
622 # stub_client. There is also one "boring" service. The number
623 # of messages is higher, because the stub_clients aren't
624 # published until after we get the 'subscribe' ack (since we
625 # don't realize that we're dealing with a v1 server [which
626 # needs stub_clients] until then), and the act of publishing
627 # the stub_client causes us to re-send all previous
629 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
630 NUM_STORAGE + NUM_CLIENTS + 1)
632 # each storage server publishes a record. There is also one
633 # "stub_client" and one "boring"
634 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
635 self.failUnlessEqual(dc["inbound_duplicate"], 0)
636 self.failUnlessEqual(dc["inbound_update"], 0)
637 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
638 # the number of outbound messages is tricky.. I think it depends
639 # upon a race between the publish and the subscribe messages.
640 self.failUnless(dc["outbound_message"] > 0)
641 # each client subscribes to "storage", and each server publishes
642 self.failUnlessEqual(dc["outbound_announcements"],
643 NUM_STORAGE*NUM_CLIENTS)
645 for c in subscribing_clients:
646 cdc = c._debug_counts
647 self.failUnless(cdc["inbound_message"])
648 self.failUnlessEqual(cdc["inbound_announcement"],
650 self.failUnlessEqual(cdc["wrong_service"], 0)
651 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
652 self.failUnlessEqual(cdc["update"], 0)
653 self.failUnlessEqual(cdc["new_announcement"],
655 anns = received_announcements[c]
656 self.failUnlessEqual(len(anns), NUM_STORAGE)
658 nodeid0 = tubs[clients[0]].tubID
660 nick = ann["nickname"]
661 self.failUnlessEqual(type(nick), unicode)
662 self.failUnlessEqual(nick, NICKNAME % "0")
663 if server_version == V1:
664 for c in publishing_clients:
665 cdc = c._debug_counts
666 expected = 1 # storage
668 expected += 1 # boring
669 if c is not clients[0]:
670 # the v2 client tries to call publish_v2, which fails
671 # because the server is v1. It then re-sends
672 # everything it has so far, plus a stub_client record
673 expected = 2*expected + 1
675 # we always tell v1 client to send stub_client
677 self.failUnlessEqual(cdc["outbound_message"], expected)
679 for c in publishing_clients:
680 cdc = c._debug_counts
682 if c in [clients[0], # stub_client
686 self.failUnlessEqual(cdc["outbound_message"], expected)
687 # now check the web status, make sure it renders without error
688 ir = introweb.IntroducerRoot(self.parent)
689 self.parent.nodeid = "NODEID"
690 text = ir.renderSynchronously().decode("utf-8")
691 self.failUnlessIn(NICKNAME % "0", text) # the v1 client
692 self.failUnlessIn(NICKNAME % "1", text) # a v2 client
693 for i in range(NUM_STORAGE):
694 self.failUnlessIn(printable_serverids[i], text,
695 (i,printable_serverids[i],text))
696 # make sure there isn't a double-base32ed string too
697 self.failIfIn(idlib.nodeid_b2a(printable_serverids[i]), text,
698 (i,printable_serverids[i],text))
699 log.msg("_check1 done")
700 d.addCallback(_check1)
702 # force an introducer reconnect, by shutting down the Tub it's using
703 # and starting a new Tub (with the old introducer). Everybody should
704 # reconnect and republish, but the introducer should ignore the
705 # republishes as duplicates. However, because the server doesn't know
706 # what each client does and does not know, it will send them a copy
707 # of the current announcement table anyway.
709 d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
710 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
712 def _wait_for_introducer_loss(ign):
713 def _introducer_lost():
715 if c.connected_to_introducer():
718 return self.poll(_introducer_lost)
719 d.addCallback(_wait_for_introducer_loss)
721 def _restart_introducer_tub(_ign):
722 log.msg("restarting introducer's Tub")
724 for i in range(NUM_CLIENTS):
725 c = subscribing_clients[i]
726 for k in c._debug_counts:
727 c._debug_counts[k] = 0
728 for k in self.the_introducer._debug_counts:
729 self.the_introducer._debug_counts[k] = 0
730 expected_announcements[i] += 1 # new 'storage' for everyone
731 self.create_tub(self.central_portnum)
732 newfurl = self.central_tub.registerReference(self.the_introducer,
734 assert newfurl == self.introducer_furl
735 d.addCallback(_restart_introducer_tub)
737 d.addCallback(_wait_for_connected)
738 d.addCallback(_wait_for_expected_announcements)
739 d.addCallback(_wait_until_idle)
740 d.addCallback(lambda _ign: log.msg(" reconnected"))
742 # TODO: publish something while the introducer is offline, then
743 # confirm it gets delivered when the connection is reestablished
745 log.msg("doing _check2")
746 # assert that the introducer sent out new messages, one per
748 dc = self.the_introducer._debug_counts
749 self.failUnlessEqual(dc["outbound_announcements"],
750 NUM_STORAGE*NUM_CLIENTS)
751 self.failUnless(dc["outbound_message"] > 0)
752 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
753 for c in subscribing_clients:
754 cdc = c._debug_counts
755 self.failUnlessEqual(cdc["inbound_message"], 1)
756 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
757 self.failUnlessEqual(cdc["new_announcement"], 0)
758 self.failUnlessEqual(cdc["wrong_service"], 0)
759 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
760 d.addCallback(_check2)
762 # Then force an introducer restart, by shutting down the Tub,
763 # destroying the old introducer, and starting a new Tub+Introducer.
764 # Everybody should reconnect and republish, and the (new) introducer
765 # will distribute the new announcements, but the clients should
766 # ignore the republishes as duplicates.
768 d.addCallback(lambda _ign: log.msg("shutting down introducer"))
769 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
770 d.addCallback(_wait_for_introducer_loss)
771 d.addCallback(lambda _ign: log.msg("introducer lost"))
773 def _restart_introducer(_ign):
774 log.msg("restarting introducer")
775 self.create_tub(self.central_portnum)
777 for i in range(NUM_CLIENTS):
778 c = subscribing_clients[i]
779 for k in c._debug_counts:
780 c._debug_counts[k] = 0
781 expected_announcements[i] += 1 # new 'storage' for everyone
782 if server_version == V1:
783 introducer = old.IntroducerService_v1()
785 introducer = IntroducerService()
786 self.the_introducer = introducer
787 newfurl = self.central_tub.registerReference(self.the_introducer,
789 assert newfurl == self.introducer_furl
790 d.addCallback(_restart_introducer)
792 d.addCallback(_wait_for_connected)
793 d.addCallback(_wait_for_expected_announcements)
794 d.addCallback(_wait_until_idle)
797 log.msg("doing _check3")
798 dc = self.the_introducer._debug_counts
799 self.failUnlessEqual(dc["outbound_announcements"],
800 NUM_STORAGE*NUM_CLIENTS)
801 self.failUnless(dc["outbound_message"] > 0)
802 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
803 for c in subscribing_clients:
804 cdc = c._debug_counts
805 self.failUnless(cdc["inbound_message"] > 0)
806 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
807 self.failUnlessEqual(cdc["new_announcement"], 0)
808 self.failUnlessEqual(cdc["wrong_service"], 0)
809 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
811 d.addCallback(_check3)
815 def test_system_v2_server(self):
816 self.basedir = "introducer/SystemTest/system_v2_server"
817 os.makedirs(self.basedir)
818 return self.do_system_test(V2)
819 test_system_v2_server.timeout = 480
820 # occasionally takes longer than 350s on "draco"
822 def test_system_v1_server(self):
823 self.basedir = "introducer/SystemTest/system_v1_server"
824 os.makedirs(self.basedir)
825 return self.do_system_test(V1)
826 test_system_v1_server.timeout = 480
827 # occasionally takes longer than 350s on "draco"
829 class FakeRemoteReference:
830 def notifyOnDisconnect(self, *args, **kwargs): pass
831 def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
832 def getLocationHints(self): return [("ipv4", "here.example.com", "1234"),
833 ("ipv4", "there.example.com", "2345")]
834 def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
837 class ClientInfo(unittest.TestCase):
838 def test_client_v2(self):
839 introducer = IntroducerService()
840 tub = introducer_furl = None
841 app_versions = {"whizzy": "fizzy"}
842 client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
843 "my_version", "oldest", app_versions,
845 #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
846 #ann_s = make_ann_t(client_v2, furl1, None, 10)
847 #introducer.remote_publish_v2(ann_s, Referenceable())
848 subscriber = FakeRemoteReference()
849 introducer.remote_subscribe_v2(subscriber, "storage",
850 client_v2._my_subscriber_info)
851 subs = introducer.get_subscribers()
852 self.failUnlessEqual(len(subs), 1)
854 self.failUnlessEqual(s0.service_name, "storage")
855 self.failUnlessEqual(s0.app_versions, app_versions)
856 self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
857 self.failUnlessEqual(s0.version, "my_version")
859 def test_client_v1(self):
860 introducer = IntroducerService()
861 subscriber = FakeRemoteReference()
862 introducer.remote_subscribe(subscriber, "storage")
863 # the v1 subscribe interface had no subscriber_info: that was usually
864 # sent in a separate stub_client pseudo-announcement
865 subs = introducer.get_subscribers()
866 self.failUnlessEqual(len(subs), 1)
868 self.failUnlessEqual(s0.nickname, u"?") # not known yet
869 self.failUnlessEqual(s0.service_name, "storage")
871 # now submit the stub_client announcement
872 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
873 ann = (furl1, "stub_client", "RIStubClient",
874 (NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
875 introducer.remote_publish(ann)
876 # the server should correlate the two
877 subs = introducer.get_subscribers()
878 self.failUnlessEqual(len(subs), 1)
880 self.failUnlessEqual(s0.service_name, "storage")
881 # v1 announcements do not contain app-versions
882 self.failUnlessEqual(s0.app_versions, {})
883 self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
884 self.failUnlessEqual(s0.version, "my_version")
886 # a subscription that arrives after the stub_client announcement
887 # should be correlated too
888 subscriber2 = FakeRemoteReference()
889 introducer.remote_subscribe(subscriber2, "thing2")
891 subs = introducer.get_subscribers()
892 self.failUnlessEqual(len(subs), 2)
893 s0 = [s for s in subs if s.service_name == "thing2"][0]
894 # v1 announcements do not contain app-versions
895 self.failUnlessEqual(s0.app_versions, {})
896 self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
897 self.failUnlessEqual(s0.version, "my_version")
899 class Announcements(unittest.TestCase):
900 def test_client_v2_unsigned(self):
901 introducer = IntroducerService()
902 tub = introducer_furl = None
903 app_versions = {"whizzy": "fizzy"}
904 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
905 "my_version", "oldest", app_versions,
907 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
908 tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
909 ann_s0 = make_ann_t(client_v2, furl1, None, 10)
910 canary0 = Referenceable()
911 introducer.remote_publish_v2(ann_s0, canary0)
912 a = introducer.get_announcements()
913 self.failUnlessEqual(len(a), 1)
914 self.failUnlessIdentical(a[0].canary, canary0)
915 self.failUnlessEqual(a[0].index, ("storage", None, tubid))
916 self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
917 self.failUnlessEqual(a[0].nickname, u"nick-v2")
918 self.failUnlessEqual(a[0].service_name, "storage")
919 self.failUnlessEqual(a[0].version, "my_version")
920 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
922 def test_client_v2_signed(self):
923 introducer = IntroducerService()
924 tub = introducer_furl = None
925 app_versions = {"whizzy": "fizzy"}
926 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
927 "my_version", "oldest", app_versions,
929 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
930 sk_s, vk_s = keyutil.make_keypair()
931 sk, _ignored = keyutil.parse_privkey(sk_s)
932 pks = keyutil.remove_prefix(vk_s, "pub-")
933 ann_t0 = make_ann_t(client_v2, furl1, sk, 10)
934 canary0 = Referenceable()
935 introducer.remote_publish_v2(ann_t0, canary0)
936 a = introducer.get_announcements()
937 self.failUnlessEqual(len(a), 1)
938 self.failUnlessIdentical(a[0].canary, canary0)
939 self.failUnlessEqual(a[0].index, ("storage", pks, None))
940 self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
941 self.failUnlessEqual(a[0].nickname, u"nick-v2")
942 self.failUnlessEqual(a[0].service_name, "storage")
943 self.failUnlessEqual(a[0].version, "my_version")
944 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
946 def test_client_v1(self):
947 introducer = IntroducerService()
949 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
950 tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
951 ann = (furl1, "storage", "RIStorage",
952 u"nick-v1".encode("utf-8"), "my_version", "oldest")
953 introducer.remote_publish(ann)
955 a = introducer.get_announcements()
956 self.failUnlessEqual(len(a), 1)
957 self.failUnlessEqual(a[0].index, ("storage", None, tubid))
958 self.failUnlessEqual(a[0].canary, None)
959 self.failUnlessEqual(a[0].announcement["app-versions"], {})
960 self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
961 self.failUnlessEqual(a[0].service_name, "storage")
962 self.failUnlessEqual(a[0].version, "my_version")
963 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
965 class ClientSeqnums(unittest.TestCase):
966 def test_client(self):
967 basedir = "introducer/ClientSeqnums/test_client"
968 fileutil.make_dirs(basedir)
969 f = open(os.path.join(basedir, "tahoe.cfg"), "w")
970 f.write("[client]\n")
971 f.write("introducer.furl = nope\n")
973 c = TahoeClient(basedir)
974 ic = c.introducer_client
975 outbound = ic._outbound_announcements
976 published = ic._published_announcements
978 f = open(os.path.join(basedir, "announcement-seqnum"))
979 seqnum = f.read().strip()
983 ic.publish("sA", {"key": "value1"}, c._server_key)
984 self.failUnlessEqual(read_seqnum(), 1)
985 self.failUnless("sA" in outbound)
986 self.failUnlessEqual(outbound["sA"]["seqnum"], 1)
987 nonce1 = outbound["sA"]["nonce"]
988 self.failUnless(isinstance(nonce1, str))
989 self.failUnlessEqual(simplejson.loads(published["sA"][0]),
991 # [1] is the signature, [2] is the pubkey
993 # publishing a second service causes both services to be
994 # re-published, with the next higher sequence number
995 ic.publish("sB", {"key": "value2"}, c._server_key)
996 self.failUnlessEqual(read_seqnum(), 2)
997 self.failUnless("sB" in outbound)
998 self.failUnlessEqual(outbound["sB"]["seqnum"], 2)
999 self.failUnless("sA" in outbound)
1000 self.failUnlessEqual(outbound["sA"]["seqnum"], 2)
1001 nonce2 = outbound["sA"]["nonce"]
1002 self.failUnless(isinstance(nonce2, str))
1003 self.failIfEqual(nonce1, nonce2)
1004 self.failUnlessEqual(simplejson.loads(published["sA"][0]),
1006 self.failUnlessEqual(simplejson.loads(published["sB"][0]),
1011 class TooNewServer(IntroducerService):
1012 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
1014 "application-version": "greetings from the crazy future",
1017 class NonV1Server(SystemTestMixin, unittest.TestCase):
1018 # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
1019 # protocol, it is supposed to provide a useful error instead of a weird
1022 def test_failure(self):
1023 self.basedir = "introducer/NonV1Server/failure"
1024 os.makedirs(self.basedir)
1027 i.setServiceParent(self.parent)
1028 self.introducer_furl = self.central_tub.registerReference(i)
1031 tub.setOption("expose-remote-exception-types", False)
1032 tub.setServiceParent(self.parent)
1033 l = tub.listenOn("tcp:0")
1034 portnum = l.getPortnum()
1035 tub.setLocation("localhost:%d" % portnum)
1037 c = IntroducerClient(tub, self.introducer_furl,
1038 u"nickname-client", "version", "oldest", {},
1041 def got(key_s, ann):
1042 announcements[key_s] = ann
1043 c.subscribe_to("storage", got)
1045 c.setServiceParent(self.parent)
1047 # now we wait for it to connect and notice the bad version
1050 return bool(c._introducer_error) or bool(c._publisher)
1051 d = self.poll(_got_bad)
1053 self.failUnless(c._introducer_error)
1054 self.failUnless(c._introducer_error.check(InsufficientVersionError),
1055 c._introducer_error)
1056 d.addCallback(_done)
1059 class DecodeFurl(unittest.TestCase):
1060 def test_decode(self):
1061 # make sure we have a working base64.b32decode. The one in
1062 # python2.4.[01] was broken.
1063 furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
1064 m = re.match(r'pb://(\w+)@', furl)
1066 nodeid = b32decode(m.group(1).upper())
1067 self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
1069 class Signatures(unittest.TestCase):
1070 def test_sign(self):
1071 ann = {"key1": "value1"}
1072 sk_s,vk_s = keyutil.make_keypair()
1073 sk,ignored = keyutil.parse_privkey(sk_s)
1074 ann_t = sign_to_foolscap(ann, sk)
1075 (msg, sig, key) = ann_t
1076 self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
1077 self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
1078 self.failUnless(sig.startswith("v0-"))
1079 self.failUnless(key.startswith("v0-"))
1080 (ann2,key2) = unsign_from_foolscap(ann_t)
1081 self.failUnlessEqual(ann2, ann)
1082 self.failUnlessEqual("pub-"+key2, vk_s)
1085 bad_ann = {"key1": "value2"}
1086 bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
1087 self.failUnlessRaises(keyutil.BadSignatureError,
1088 unsign_from_foolscap, (bad_msg,sig,key))
1089 # sneaky bad signature should be ignored
1090 (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
1091 self.failUnlessEqual(key2, None)
1092 self.failUnlessEqual(ann2, bad_ann)
1094 # unrecognized signatures
1095 self.failUnlessRaises(UnknownKeyError,
1096 unsign_from_foolscap, (bad_msg,"v999-sig",key))
1097 self.failUnlessRaises(UnknownKeyError,
1098 unsign_from_foolscap, (bad_msg,sig,"v999-key"))
1101 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
1102 # should leave the Reconnector in place, also if it receives
1103 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
1104 # should tear down the Reconnector and make a new one. This behavior used to
1105 # live in the IntroducerClient, and thus used to be tested by test_introducer
1107 # copying more tests from old branch:
1109 # then also add Upgrade test