2 import os, re, itertools
3 from base64 import b32decode
6 from twisted.trial import unittest
7 from twisted.internet import defer, address
8 from twisted.python import log
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14 WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService, FurlFileConflictError
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17 get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.web import introweb
23 from allmydata.client import Client as TahoeClient
24 from allmydata.util import pollmixin, keyutil, idlib, fileutil
25 import allmydata.test.common_util as testutil
27 class LoggingMultiService(service.MultiService):
28 def log(self, msg, **kw):
31 class Node(testutil.SignalMixin, unittest.TestCase):
33 basedir = "introducer.IntroducerNode.test_furl"
35 public_fn = os.path.join(basedir, "introducer.furl")
36 private_fn = os.path.join(basedir, "private", "introducer.furl")
37 q1 = IntroducerNode(basedir)
38 d = fireEventually(None)
39 d.addCallback(lambda res: q1.startService())
40 d.addCallback(lambda res: q1.when_tub_ready())
41 d.addCallback(lambda res: q1.stopService())
42 d.addCallback(flushEventualQueue)
44 # new nodes create unguessable furls in private/introducer.furl
45 ifurl = fileutil.read(private_fn)
46 self.failUnless(ifurl)
48 self.failIf(ifurl.endswith("/introducer"), ifurl)
50 # old nodes created guessable furls in BASEDIR/introducer.furl
51 guessable = ifurl[:ifurl.rfind("/")] + "/introducer"
52 fileutil.write(public_fn, guessable+"\n", mode="w") # text
54 # if we see both files, throw an error
55 self.failUnlessRaises(FurlFileConflictError,
56 IntroducerNode, basedir)
58 # when we see only the public one, move it to private/ and use
59 # the existing furl instead of creating a new one
61 q2 = IntroducerNode(basedir)
62 d2 = fireEventually(None)
63 d2.addCallback(lambda res: q2.startService())
64 d2.addCallback(lambda res: q2.when_tub_ready())
65 d2.addCallback(lambda res: q2.stopService())
66 d2.addCallback(flushEventualQueue)
67 def _check_furl2(res):
68 self.failIf(os.path.exists(public_fn))
69 ifurl2 = fileutil.read(private_fn)
70 self.failUnless(ifurl2)
71 self.failUnlessEqual(ifurl2.strip(), guessable)
72 d2.addCallback(_check_furl2)
74 d.addCallback(_check_furl)
79 self.parent = LoggingMultiService()
80 self.parent.startService()
82 log.msg("TestIntroducer.tearDown")
83 d = defer.succeed(None)
84 d.addCallback(lambda res: self.parent.stopService())
85 d.addCallback(flushEventualQueue)
88 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
90 def test_create(self):
91 ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
92 "my_version", "oldest_version", {}, fakeseq)
93 self.failUnless(isinstance(ic, IntroducerClient))
95 def test_listen(self):
96 i = IntroducerService()
97 i.setServiceParent(self.parent)
99 def test_duplicate_publish(self):
100 i = IntroducerService()
101 self.failUnlessEqual(len(i.get_announcements()), 0)
102 self.failUnlessEqual(len(i.get_subscribers()), 0)
103 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
104 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
105 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
106 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
107 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
108 i.remote_publish(ann1)
109 self.failUnlessEqual(len(i.get_announcements()), 1)
110 self.failUnlessEqual(len(i.get_subscribers()), 0)
111 i.remote_publish(ann2)
112 self.failUnlessEqual(len(i.get_announcements()), 2)
113 self.failUnlessEqual(len(i.get_subscribers()), 0)
114 i.remote_publish(ann1b)
115 self.failUnlessEqual(len(i.get_announcements()), 2)
116 self.failUnlessEqual(len(i.get_subscribers()), 0)
118 def test_id_collision(self):
119 # test replacement case where tubid equals a keyid (one should
120 # not replace the other)
121 i = IntroducerService()
122 ic = IntroducerClient(None,
123 "introducer.furl", u"my_nickname",
124 "my_version", "oldest_version", {}, fakeseq)
125 sk_s, vk_s = keyutil.make_keypair()
126 sk, _ignored = keyutil.parse_privkey(sk_s)
127 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
128 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
129 ann_t = make_ann_t(ic, furl1, sk, 1)
130 i.remote_publish_v2(ann_t, Referenceable())
131 announcements = i.get_announcements()
132 self.failUnlessEqual(len(announcements), 1)
133 key1 = ("storage", "v0-"+keyid, None)
134 self.failUnlessEqual(announcements[0].index, key1)
135 ann1_out = announcements[0].announcement
136 self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
138 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
139 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
140 i.remote_publish(ann2)
141 announcements = i.get_announcements()
142 self.failUnlessEqual(len(announcements), 2)
143 key2 = ("storage", None, keyid)
144 wanted = [ad for ad in announcements if ad.index == key2]
145 self.failUnlessEqual(len(wanted), 1)
146 ann2_out = wanted[0].announcement
147 self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
153 seqnum_counter = itertools.count(1)
155 return seqnum_counter.next(), str(os.randint(1,100000))
158 ann = { "anonymous-storage-FURL": furl,
159 "permutation-seed-base32": get_tubid_string(furl) }
162 def make_ann_t(ic, furl, privkey, seqnum):
163 ann_d = ic.create_announcement_dict("storage", make_ann(furl))
164 ann_d["seqnum"] = seqnum
165 ann_d["nonce"] = "nonce"
166 ann_t = sign_to_foolscap(ann_d, privkey)
169 class Client(unittest.TestCase):
170 def test_duplicate_receive_v1(self):
171 ic = IntroducerClient(None,
172 "introducer.furl", u"my_nickname",
173 "my_version", "oldest_version", {}, fakeseq)
175 ic.subscribe_to("storage",
176 lambda key_s,ann: announcements.append(ann))
177 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
178 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
179 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
180 ca = WrapV2ClientInV1Interface(ic)
182 ca.remote_announce([ann1])
185 self.failUnlessEqual(len(announcements), 1)
186 self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
187 self.failUnlessEqual(announcements[0]["my-version"], "ver23")
188 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
189 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
190 self.failUnlessEqual(ic._debug_counts["update"], 0)
191 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
192 # now send a duplicate announcement: this should not notify clients
193 ca.remote_announce([ann1])
194 return fireEventually()
197 self.failUnlessEqual(len(announcements), 1)
198 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
199 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
200 self.failUnlessEqual(ic._debug_counts["update"], 0)
201 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
202 # and a replacement announcement: same FURL, new other stuff.
203 # Clients should be notified.
204 ca.remote_announce([ann1b])
205 return fireEventually()
206 d.addCallback(_then2)
208 self.failUnlessEqual(len(announcements), 2)
209 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
210 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
211 self.failUnlessEqual(ic._debug_counts["update"], 1)
212 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
213 # test that the other stuff changed
214 self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
215 self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
216 d.addCallback(_then3)
219 def test_duplicate_receive_v2(self):
220 ic1 = IntroducerClient(None,
221 "introducer.furl", u"my_nickname",
222 "ver23", "oldest_version", {}, fakeseq)
223 # we use a second client just to create a different-looking
225 ic2 = IntroducerClient(None,
226 "introducer.furl", u"my_nickname",
227 "ver24","oldest_version",{}, fakeseq)
229 def _received(key_s, ann):
230 announcements.append( (key_s, ann) )
231 ic1.subscribe_to("storage", _received)
232 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
233 furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
234 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
236 privkey_s, pubkey_vs = keyutil.make_keypair()
237 privkey, _ignored = keyutil.parse_privkey(privkey_s)
238 pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
241 # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
245 self.ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
246 self.ann1old = make_ann_t(ic1, furl1, privkey, seqnum=9)
247 self.ann1noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
248 self.ann1b = make_ann_t(ic2, furl1, privkey, seqnum=11)
249 self.ann1a = make_ann_t(ic1, furl1a, privkey, seqnum=12)
250 self.ann2 = make_ann_t(ic2, furl2, privkey, seqnum=13)
252 ic1.remote_announce_v2([self.ann1]) # queues eventual-send
255 self.failUnlessEqual(len(announcements), 1)
256 key_s,ann = announcements[0]
257 self.failUnlessEqual(key_s, pubkey_s)
258 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
259 self.failUnlessEqual(ann["my-version"], "ver23")
260 d.addCallback(_then1)
262 # now send a duplicate announcement. This should not fire the
264 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
265 d.addCallback(fireEventually)
267 self.failUnlessEqual(len(announcements), 1)
268 d.addCallback(_then2)
270 # an older announcement shouldn't fire the subscriber either
271 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1old]))
272 d.addCallback(fireEventually)
274 self.failUnlessEqual(len(announcements), 1)
275 d.addCallback(_then2a)
277 # announcement with no seqnum cannot replace one with-seqnum
278 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1noseqnum]))
279 d.addCallback(fireEventually)
281 self.failUnlessEqual(len(announcements), 1)
282 d.addCallback(_then2b)
284 # and a replacement announcement: same FURL, new other stuff. The
285 # subscriber *should* be fired.
286 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
287 d.addCallback(fireEventually)
289 self.failUnlessEqual(len(announcements), 2)
290 key_s,ann = announcements[-1]
291 self.failUnlessEqual(key_s, pubkey_s)
292 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
293 self.failUnlessEqual(ann["my-version"], "ver24")
294 d.addCallback(_then3)
296 # and a replacement announcement with a different FURL (it uses
297 # different connection hints)
298 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
299 d.addCallback(fireEventually)
301 self.failUnlessEqual(len(announcements), 3)
302 key_s,ann = announcements[-1]
303 self.failUnlessEqual(key_s, pubkey_s)
304 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
305 self.failUnlessEqual(ann["my-version"], "ver23")
306 d.addCallback(_then4)
308 # now add a new subscription, which should be called with the
309 # backlog. The introducer only records one announcement per index, so
310 # the backlog will only have the latest message.
312 def _received2(key_s, ann):
313 announcements2.append( (key_s, ann) )
314 d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
315 d.addCallback(fireEventually)
317 self.failUnlessEqual(len(announcements2), 1)
318 key_s,ann = announcements2[-1]
319 self.failUnlessEqual(key_s, pubkey_s)
320 self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
321 self.failUnlessEqual(ann["my-version"], "ver23")
322 d.addCallback(_then5)
325 def test_id_collision(self):
326 # test replacement case where tubid equals a keyid (one should
327 # not replace the other)
328 ic = IntroducerClient(None,
329 "introducer.furl", u"my_nickname",
330 "my_version", "oldest_version", {}, fakeseq)
332 ic.subscribe_to("storage",
333 lambda key_s,ann: announcements.append(ann))
334 sk_s, vk_s = keyutil.make_keypair()
335 sk, _ignored = keyutil.parse_privkey(sk_s)
336 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
337 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
338 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
339 ann_t = make_ann_t(ic, furl1, sk, 1)
340 ic.remote_announce_v2([ann_t])
343 # first announcement has been processed
344 self.failUnlessEqual(len(announcements), 1)
345 self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
347 # now submit a second one, with a tubid that happens to look just
348 # like the pubkey-based serverid we just processed. They should
350 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
351 ca = WrapV2ClientInV1Interface(ic)
352 ca.remote_announce([ann2])
353 return fireEventually()
356 # if they overlapped, the second announcement would be ignored
357 self.failUnlessEqual(len(announcements), 2)
358 self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
360 d.addCallback(_then2)
363 class Server(unittest.TestCase):
364 def test_duplicate(self):
365 i = IntroducerService()
366 ic1 = IntroducerClient(None,
367 "introducer.furl", u"my_nickname",
368 "ver23", "oldest_version", {}, realseq)
369 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
371 privkey_s, _ = keyutil.make_keypair()
372 privkey, _ = keyutil.parse_privkey(privkey_s)
374 ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
375 ann1_old = make_ann_t(ic1, furl1, privkey, seqnum=9)
376 ann1_new = make_ann_t(ic1, furl1, privkey, seqnum=11)
377 ann1_noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
378 ann1_badseqnum = make_ann_t(ic1, furl1, privkey, seqnum="not an int")
380 i.remote_publish_v2(ann1, None)
381 all = i.get_announcements()
382 self.failUnlessEqual(len(all), 1)
383 self.failUnlessEqual(all[0].announcement["seqnum"], 10)
384 self.failUnlessEqual(i._debug_counts["inbound_message"], 1)
385 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 0)
386 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
387 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
388 self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
390 i.remote_publish_v2(ann1, None)
391 all = i.get_announcements()
392 self.failUnlessEqual(len(all), 1)
393 self.failUnlessEqual(all[0].announcement["seqnum"], 10)
394 self.failUnlessEqual(i._debug_counts["inbound_message"], 2)
395 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
396 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
397 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
398 self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
400 i.remote_publish_v2(ann1_old, None)
401 all = i.get_announcements()
402 self.failUnlessEqual(len(all), 1)
403 self.failUnlessEqual(all[0].announcement["seqnum"], 10)
404 self.failUnlessEqual(i._debug_counts["inbound_message"], 3)
405 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
406 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
407 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
408 self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
410 i.remote_publish_v2(ann1_new, None)
411 all = i.get_announcements()
412 self.failUnlessEqual(len(all), 1)
413 self.failUnlessEqual(all[0].announcement["seqnum"], 11)
414 self.failUnlessEqual(i._debug_counts["inbound_message"], 4)
415 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
416 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
417 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
418 self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
420 i.remote_publish_v2(ann1_noseqnum, None)
421 all = i.get_announcements()
422 self.failUnlessEqual(len(all), 1)
423 self.failUnlessEqual(all[0].announcement["seqnum"], 11)
424 self.failUnlessEqual(i._debug_counts["inbound_message"], 5)
425 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
426 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 1)
427 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
428 self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
430 i.remote_publish_v2(ann1_badseqnum, None)
431 all = i.get_announcements()
432 self.failUnlessEqual(len(all), 1)
433 self.failUnlessEqual(all[0].announcement["seqnum"], 11)
434 self.failUnlessEqual(i._debug_counts["inbound_message"], 6)
435 self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
436 self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 2)
437 self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
438 self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
441 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
443 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
445 def create_tub(self, portnum=0):
446 tubfile = os.path.join(self.basedir, "tub.pem")
447 self.central_tub = tub = Tub(certFile=tubfile)
448 #tub.setOption("logLocalFailures", True)
449 #tub.setOption("logRemoteFailures", True)
450 tub.setOption("expose-remote-exception-types", False)
451 tub.setServiceParent(self.parent)
452 l = tub.listenOn("tcp:%d" % portnum)
453 self.central_portnum = l.getPortnum()
455 assert self.central_portnum == portnum
456 tub.setLocation("localhost:%d" % self.central_portnum)
458 class Queue(SystemTestMixin, unittest.TestCase):
459 def test_queue_until_connected(self):
460 self.basedir = "introducer/QueueUntilConnected/queued"
461 os.makedirs(self.basedir)
463 introducer = IntroducerService()
464 introducer.setServiceParent(self.parent)
465 iff = os.path.join(self.basedir, "introducer.furl")
466 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
468 tub2.setServiceParent(self.parent)
469 c = IntroducerClient(tub2, ifurl,
470 u"nickname", "version", "oldest", {}, fakeseq)
471 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
472 sk_s, vk_s = keyutil.make_keypair()
473 sk, _ignored = keyutil.parse_privkey(sk_s)
475 d = introducer.disownServiceParent()
477 # now that the introducer server is offline, create a client and
478 # publish some messages
479 c.setServiceParent(self.parent) # this starts the reconnector
480 c.publish("storage", make_ann(furl1), sk)
482 introducer.setServiceParent(self.parent) # restart the server
483 # now wait for the messages to be delivered
484 def _got_announcement():
485 return bool(introducer.get_announcements())
486 return self.poll(_got_announcement)
487 d.addCallback(_offline)
489 v = introducer.get_announcements()[0]
490 furl = v.announcement["anonymous-storage-FURL"]
491 self.failUnlessEqual(furl, furl1)
494 # now let the ack get back
495 def _wait_until_idle(ign):
497 if c._debug_outstanding:
499 if introducer._debug_outstanding:
502 return self.poll(_idle)
503 d.addCallback(_wait_until_idle)
508 class SystemTest(SystemTestMixin, unittest.TestCase):
510 def do_system_test(self, server_version):
512 if server_version == V1:
513 introducer = old.IntroducerService_v1()
515 introducer = IntroducerService()
516 introducer.setServiceParent(self.parent)
517 iff = os.path.join(self.basedir, "introducer.furl")
518 tub = self.central_tub
519 ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
520 self.introducer_furl = ifurl
522 # we have 5 clients who publish themselves as storage servers, and a
523 # sixth which does which not. All 6 clients subscriber to hear about
524 # storage. When the connections are fully established, all six nodes
525 # should have 5 connections each.
531 received_announcements = {}
532 subscribing_clients = []
533 publishing_clients = []
534 printable_serverids = {}
535 self.the_introducer = introducer
537 expected_announcements = [0 for c in range(NUM_CLIENTS)]
539 for i in range(NUM_CLIENTS):
541 #tub.setOption("logLocalFailures", True)
542 #tub.setOption("logRemoteFailures", True)
543 tub.setOption("expose-remote-exception-types", False)
544 tub.setServiceParent(self.parent)
545 l = tub.listenOn("tcp:0")
546 portnum = l.getPortnum()
547 tub.setLocation("localhost:%d" % portnum)
549 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
551 c = old.IntroducerClient_v1(tub, self.introducer_furl,
555 c = IntroducerClient(tub, self.introducer_furl,
558 {"component": "component-v1"}, fakeseq)
559 received_announcements[c] = {}
560 def got(key_s_or_tubid, ann, announcements, i):
562 index = get_tubid_string_from_ann(ann)
564 index = key_s_or_tubid or get_tubid_string_from_ann(ann)
565 announcements[index] = ann
566 c.subscribe_to("storage", got, received_announcements[c], i)
567 subscribing_clients.append(c)
568 expected_announcements[i] += 1 # all expect a 'storage' announcement
570 node_furl = tub.registerReference(Referenceable())
573 c.publish(node_furl, "storage", "ri_name")
574 printable_serverids[i] = get_tubid_string(node_furl)
576 # sign the announcement
577 privkey_s, pubkey_s = keyutil.make_keypair()
578 privkey, _ignored = keyutil.parse_privkey(privkey_s)
579 privkeys[c] = privkey
580 c.publish("storage", make_ann(node_furl), privkey)
581 if server_version == V1:
582 printable_serverids[i] = get_tubid_string(node_furl)
584 assert pubkey_s.startswith("pub-")
585 printable_serverids[i] = pubkey_s[len("pub-"):]
587 c.publish("storage", make_ann(node_furl))
588 printable_serverids[i] = get_tubid_string(node_furl)
589 publishing_clients.append(c)
591 # the last one does not publish anything
595 # users of the V1 client were required to publish a
596 # 'stub_client' record (somewhat after they published the
597 # 'storage' record), so the introducer could see their
598 # version. Match that behavior.
599 c.publish(node_furl, "stub_client", "stub_ri_name")
602 # also publish something that nobody cares about
603 boring_furl = tub.registerReference(Referenceable())
604 c.publish("boring", make_ann(boring_furl))
606 c.setServiceParent(self.parent)
611 def _wait_for_connected(ign):
614 if not c.connected_to_introducer():
617 return self.poll(_connected)
619 # we watch the clients to determine when the system has settled down.
620 # Then we can look inside the server to assert things about its
623 def _wait_for_expected_announcements(ign):
624 def _got_expected_announcements():
625 for i,c in enumerate(subscribing_clients):
626 if len(received_announcements[c]) < expected_announcements[i]:
629 return self.poll(_got_expected_announcements)
631 # before shutting down any Tub, we'd like to know that there are no
632 # messages outstanding
634 def _wait_until_idle(ign):
636 for c in subscribing_clients + publishing_clients:
637 if c._debug_outstanding:
639 if self.the_introducer._debug_outstanding:
642 return self.poll(_idle)
644 d = defer.succeed(None)
645 d.addCallback(_wait_for_connected)
646 d.addCallback(_wait_for_expected_announcements)
647 d.addCallback(_wait_until_idle)
650 log.msg("doing _check1")
651 dc = self.the_introducer._debug_counts
652 if server_version == V1:
653 # each storage server publishes a record, and (after its
654 # 'subscribe' has been ACKed) also publishes a "stub_client".
655 # The non-storage client (which subscribes) also publishes a
656 # stub_client. There is also one "boring" service. The number
657 # of messages is higher, because the stub_clients aren't
658 # published until after we get the 'subscribe' ack (since we
659 # don't realize that we're dealing with a v1 server [which
660 # needs stub_clients] until then), and the act of publishing
661 # the stub_client causes us to re-send all previous
663 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
664 NUM_STORAGE + NUM_CLIENTS + 1)
666 # each storage server publishes a record. There is also one
667 # "stub_client" and one "boring"
668 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
669 self.failUnlessEqual(dc["inbound_duplicate"], 0)
670 self.failUnlessEqual(dc["inbound_update"], 0)
671 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
672 # the number of outbound messages is tricky.. I think it depends
673 # upon a race between the publish and the subscribe messages.
674 self.failUnless(dc["outbound_message"] > 0)
675 # each client subscribes to "storage", and each server publishes
676 self.failUnlessEqual(dc["outbound_announcements"],
677 NUM_STORAGE*NUM_CLIENTS)
679 for c in subscribing_clients:
680 cdc = c._debug_counts
681 self.failUnless(cdc["inbound_message"])
682 self.failUnlessEqual(cdc["inbound_announcement"],
684 self.failUnlessEqual(cdc["wrong_service"], 0)
685 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
686 self.failUnlessEqual(cdc["update"], 0)
687 self.failUnlessEqual(cdc["new_announcement"],
689 anns = received_announcements[c]
690 self.failUnlessEqual(len(anns), NUM_STORAGE)
692 nodeid0 = tubs[clients[0]].tubID
694 nick = ann["nickname"]
695 self.failUnlessEqual(type(nick), unicode)
696 self.failUnlessEqual(nick, NICKNAME % "0")
697 if server_version == V1:
698 for c in publishing_clients:
699 cdc = c._debug_counts
700 expected = 1 # storage
702 expected += 1 # boring
703 if c is not clients[0]:
704 # the v2 client tries to call publish_v2, which fails
705 # because the server is v1. It then re-sends
706 # everything it has so far, plus a stub_client record
707 expected = 2*expected + 1
709 # we always tell v1 client to send stub_client
711 self.failUnlessEqual(cdc["outbound_message"], expected)
713 for c in publishing_clients:
714 cdc = c._debug_counts
716 if c in [clients[0], # stub_client
720 self.failUnlessEqual(cdc["outbound_message"], expected)
721 # now check the web status, make sure it renders without error
722 ir = introweb.IntroducerRoot(self.parent)
723 self.parent.nodeid = "NODEID"
724 text = ir.renderSynchronously().decode("utf-8")
725 self.failUnlessIn(NICKNAME % "0", text) # the v1 client
726 self.failUnlessIn(NICKNAME % "1", text) # a v2 client
727 for i in range(NUM_STORAGE):
728 self.failUnlessIn(printable_serverids[i], text,
729 (i,printable_serverids[i],text))
730 # make sure there isn't a double-base32ed string too
731 self.failIfIn(idlib.nodeid_b2a(printable_serverids[i]), text,
732 (i,printable_serverids[i],text))
733 log.msg("_check1 done")
734 d.addCallback(_check1)
736 # force an introducer reconnect, by shutting down the Tub it's using
737 # and starting a new Tub (with the old introducer). Everybody should
738 # reconnect and republish, but the introducer should ignore the
739 # republishes as duplicates. However, because the server doesn't know
740 # what each client does and does not know, it will send them a copy
741 # of the current announcement table anyway.
743 d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
744 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
746 def _wait_for_introducer_loss(ign):
747 def _introducer_lost():
749 if c.connected_to_introducer():
752 return self.poll(_introducer_lost)
753 d.addCallback(_wait_for_introducer_loss)
755 def _restart_introducer_tub(_ign):
756 log.msg("restarting introducer's Tub")
758 for i in range(NUM_CLIENTS):
759 c = subscribing_clients[i]
760 for k in c._debug_counts:
761 c._debug_counts[k] = 0
762 for k in self.the_introducer._debug_counts:
763 self.the_introducer._debug_counts[k] = 0
764 expected_announcements[i] += 1 # new 'storage' for everyone
765 self.create_tub(self.central_portnum)
766 newfurl = self.central_tub.registerReference(self.the_introducer,
768 assert newfurl == self.introducer_furl
769 d.addCallback(_restart_introducer_tub)
771 d.addCallback(_wait_for_connected)
772 d.addCallback(_wait_for_expected_announcements)
773 d.addCallback(_wait_until_idle)
774 d.addCallback(lambda _ign: log.msg(" reconnected"))
776 # TODO: publish something while the introducer is offline, then
777 # confirm it gets delivered when the connection is reestablished
779 log.msg("doing _check2")
780 # assert that the introducer sent out new messages, one per
782 dc = self.the_introducer._debug_counts
783 self.failUnlessEqual(dc["outbound_announcements"],
784 NUM_STORAGE*NUM_CLIENTS)
785 self.failUnless(dc["outbound_message"] > 0)
786 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
787 for c in subscribing_clients:
788 cdc = c._debug_counts
789 self.failUnlessEqual(cdc["inbound_message"], 1)
790 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
791 self.failUnlessEqual(cdc["new_announcement"], 0)
792 self.failUnlessEqual(cdc["wrong_service"], 0)
793 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
794 d.addCallback(_check2)
796 # Then force an introducer restart, by shutting down the Tub,
797 # destroying the old introducer, and starting a new Tub+Introducer.
798 # Everybody should reconnect and republish, and the (new) introducer
799 # will distribute the new announcements, but the clients should
800 # ignore the republishes as duplicates.
802 d.addCallback(lambda _ign: log.msg("shutting down introducer"))
803 d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
804 d.addCallback(_wait_for_introducer_loss)
805 d.addCallback(lambda _ign: log.msg("introducer lost"))
807 def _restart_introducer(_ign):
808 log.msg("restarting introducer")
809 self.create_tub(self.central_portnum)
811 for i in range(NUM_CLIENTS):
812 c = subscribing_clients[i]
813 for k in c._debug_counts:
814 c._debug_counts[k] = 0
815 expected_announcements[i] += 1 # new 'storage' for everyone
816 if server_version == V1:
817 introducer = old.IntroducerService_v1()
819 introducer = IntroducerService()
820 self.the_introducer = introducer
821 newfurl = self.central_tub.registerReference(self.the_introducer,
823 assert newfurl == self.introducer_furl
824 d.addCallback(_restart_introducer)
826 d.addCallback(_wait_for_connected)
827 d.addCallback(_wait_for_expected_announcements)
828 d.addCallback(_wait_until_idle)
831 log.msg("doing _check3")
832 dc = self.the_introducer._debug_counts
833 self.failUnlessEqual(dc["outbound_announcements"],
834 NUM_STORAGE*NUM_CLIENTS)
835 self.failUnless(dc["outbound_message"] > 0)
836 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
837 for c in subscribing_clients:
838 cdc = c._debug_counts
839 self.failUnless(cdc["inbound_message"] > 0)
840 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
841 self.failUnlessEqual(cdc["new_announcement"], 0)
842 self.failUnlessEqual(cdc["wrong_service"], 0)
843 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
845 d.addCallback(_check3)
849 def test_system_v2_server(self):
850 self.basedir = "introducer/SystemTest/system_v2_server"
851 os.makedirs(self.basedir)
852 return self.do_system_test(V2)
853 test_system_v2_server.timeout = 480
854 # occasionally takes longer than 350s on "draco"
856 def test_system_v1_server(self):
857 self.basedir = "introducer/SystemTest/system_v1_server"
858 os.makedirs(self.basedir)
859 return self.do_system_test(V1)
860 test_system_v1_server.timeout = 480
861 # occasionally takes longer than 350s on "draco"
863 class FakeRemoteReference:
864 def notifyOnDisconnect(self, *args, **kwargs): pass
865 def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
866 def getLocationHints(self): return [("ipv4", "here.example.com", "1234"),
867 ("ipv4", "there.example.com", "2345")]
868 def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
871 class ClientInfo(unittest.TestCase):
872 def test_client_v2(self):
873 introducer = IntroducerService()
874 tub = introducer_furl = None
875 app_versions = {"whizzy": "fizzy"}
876 client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
877 "my_version", "oldest", app_versions,
879 #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
880 #ann_s = make_ann_t(client_v2, furl1, None, 10)
881 #introducer.remote_publish_v2(ann_s, Referenceable())
882 subscriber = FakeRemoteReference()
883 introducer.remote_subscribe_v2(subscriber, "storage",
884 client_v2._my_subscriber_info)
885 subs = introducer.get_subscribers()
886 self.failUnlessEqual(len(subs), 1)
888 self.failUnlessEqual(s0.service_name, "storage")
889 self.failUnlessEqual(s0.app_versions, app_versions)
890 self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
891 self.failUnlessEqual(s0.version, "my_version")
893 def test_client_v1(self):
894 introducer = IntroducerService()
895 subscriber = FakeRemoteReference()
896 introducer.remote_subscribe(subscriber, "storage")
897 # the v1 subscribe interface had no subscriber_info: that was usually
898 # sent in a separate stub_client pseudo-announcement
899 subs = introducer.get_subscribers()
900 self.failUnlessEqual(len(subs), 1)
902 self.failUnlessEqual(s0.nickname, u"?") # not known yet
903 self.failUnlessEqual(s0.service_name, "storage")
905 # now submit the stub_client announcement
906 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
907 ann = (furl1, "stub_client", "RIStubClient",
908 (NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
909 introducer.remote_publish(ann)
910 # the server should correlate the two
911 subs = introducer.get_subscribers()
912 self.failUnlessEqual(len(subs), 1)
914 self.failUnlessEqual(s0.service_name, "storage")
915 # v1 announcements do not contain app-versions
916 self.failUnlessEqual(s0.app_versions, {})
917 self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
918 self.failUnlessEqual(s0.version, "my_version")
920 # a subscription that arrives after the stub_client announcement
921 # should be correlated too
922 subscriber2 = FakeRemoteReference()
923 introducer.remote_subscribe(subscriber2, "thing2")
925 subs = introducer.get_subscribers()
926 self.failUnlessEqual(len(subs), 2)
927 s0 = [s for s in subs if s.service_name == "thing2"][0]
928 # v1 announcements do not contain app-versions
929 self.failUnlessEqual(s0.app_versions, {})
930 self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
931 self.failUnlessEqual(s0.version, "my_version")
933 class Announcements(unittest.TestCase):
934 def test_client_v2_unsigned(self):
935 introducer = IntroducerService()
936 tub = introducer_furl = None
937 app_versions = {"whizzy": "fizzy"}
938 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
939 "my_version", "oldest", app_versions,
941 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
942 tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
943 ann_s0 = make_ann_t(client_v2, furl1, None, 10)
944 canary0 = Referenceable()
945 introducer.remote_publish_v2(ann_s0, canary0)
946 a = introducer.get_announcements()
947 self.failUnlessEqual(len(a), 1)
948 self.failUnlessIdentical(a[0].canary, canary0)
949 self.failUnlessEqual(a[0].index, ("storage", None, tubid))
950 self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
951 self.failUnlessEqual(a[0].nickname, u"nick-v2")
952 self.failUnlessEqual(a[0].service_name, "storage")
953 self.failUnlessEqual(a[0].version, "my_version")
954 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
956 def test_client_v2_signed(self):
957 introducer = IntroducerService()
958 tub = introducer_furl = None
959 app_versions = {"whizzy": "fizzy"}
960 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
961 "my_version", "oldest", app_versions,
963 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
964 sk_s, vk_s = keyutil.make_keypair()
965 sk, _ignored = keyutil.parse_privkey(sk_s)
966 pks = keyutil.remove_prefix(vk_s, "pub-")
967 ann_t0 = make_ann_t(client_v2, furl1, sk, 10)
968 canary0 = Referenceable()
969 introducer.remote_publish_v2(ann_t0, canary0)
970 a = introducer.get_announcements()
971 self.failUnlessEqual(len(a), 1)
972 self.failUnlessIdentical(a[0].canary, canary0)
973 self.failUnlessEqual(a[0].index, ("storage", pks, None))
974 self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
975 self.failUnlessEqual(a[0].nickname, u"nick-v2")
976 self.failUnlessEqual(a[0].service_name, "storage")
977 self.failUnlessEqual(a[0].version, "my_version")
978 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
980 def test_client_v1(self):
981 introducer = IntroducerService()
983 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
984 tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
985 ann = (furl1, "storage", "RIStorage",
986 u"nick-v1".encode("utf-8"), "my_version", "oldest")
987 introducer.remote_publish(ann)
989 a = introducer.get_announcements()
990 self.failUnlessEqual(len(a), 1)
991 self.failUnlessEqual(a[0].index, ("storage", None, tubid))
992 self.failUnlessEqual(a[0].canary, None)
993 self.failUnlessEqual(a[0].announcement["app-versions"], {})
994 self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
995 self.failUnlessEqual(a[0].service_name, "storage")
996 self.failUnlessEqual(a[0].version, "my_version")
997 self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
999 class ClientSeqnums(unittest.TestCase):
1000 def test_client(self):
1001 basedir = "introducer/ClientSeqnums/test_client"
1002 fileutil.make_dirs(basedir)
1003 f = open(os.path.join(basedir, "tahoe.cfg"), "w")
1004 f.write("[client]\n")
1005 f.write("introducer.furl = nope\n")
1007 c = TahoeClient(basedir)
1008 ic = c.introducer_client
1009 outbound = ic._outbound_announcements
1010 published = ic._published_announcements
1012 f = open(os.path.join(basedir, "announcement-seqnum"))
1013 seqnum = f.read().strip()
1017 ic.publish("sA", {"key": "value1"}, c._server_key)
1018 self.failUnlessEqual(read_seqnum(), 1)
1019 self.failUnless("sA" in outbound)
1020 self.failUnlessEqual(outbound["sA"]["seqnum"], 1)
1021 nonce1 = outbound["sA"]["nonce"]
1022 self.failUnless(isinstance(nonce1, str))
1023 self.failUnlessEqual(simplejson.loads(published["sA"][0]),
1025 # [1] is the signature, [2] is the pubkey
1027 # publishing a second service causes both services to be
1028 # re-published, with the next higher sequence number
1029 ic.publish("sB", {"key": "value2"}, c._server_key)
1030 self.failUnlessEqual(read_seqnum(), 2)
1031 self.failUnless("sB" in outbound)
1032 self.failUnlessEqual(outbound["sB"]["seqnum"], 2)
1033 self.failUnless("sA" in outbound)
1034 self.failUnlessEqual(outbound["sA"]["seqnum"], 2)
1035 nonce2 = outbound["sA"]["nonce"]
1036 self.failUnless(isinstance(nonce2, str))
1037 self.failIfEqual(nonce1, nonce2)
1038 self.failUnlessEqual(simplejson.loads(published["sA"][0]),
1040 self.failUnlessEqual(simplejson.loads(published["sB"][0]),
1045 class TooNewServer(IntroducerService):
1046 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
1048 "application-version": "greetings from the crazy future",
1051 class NonV1Server(SystemTestMixin, unittest.TestCase):
1052 # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
1053 # protocol, it is supposed to provide a useful error instead of a weird
1056 def test_failure(self):
1057 self.basedir = "introducer/NonV1Server/failure"
1058 os.makedirs(self.basedir)
1061 i.setServiceParent(self.parent)
1062 self.introducer_furl = self.central_tub.registerReference(i)
1065 tub.setOption("expose-remote-exception-types", False)
1066 tub.setServiceParent(self.parent)
1067 l = tub.listenOn("tcp:0")
1068 portnum = l.getPortnum()
1069 tub.setLocation("localhost:%d" % portnum)
1071 c = IntroducerClient(tub, self.introducer_furl,
1072 u"nickname-client", "version", "oldest", {},
1075 def got(key_s, ann):
1076 announcements[key_s] = ann
1077 c.subscribe_to("storage", got)
1079 c.setServiceParent(self.parent)
1081 # now we wait for it to connect and notice the bad version
1084 return bool(c._introducer_error) or bool(c._publisher)
1085 d = self.poll(_got_bad)
1087 self.failUnless(c._introducer_error)
1088 self.failUnless(c._introducer_error.check(InsufficientVersionError),
1089 c._introducer_error)
1090 d.addCallback(_done)
1093 class DecodeFurl(unittest.TestCase):
1094 def test_decode(self):
1095 # make sure we have a working base64.b32decode. The one in
1096 # python2.4.[01] was broken.
1097 furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
1098 m = re.match(r'pb://(\w+)@', furl)
1100 nodeid = b32decode(m.group(1).upper())
1101 self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
1103 class Signatures(unittest.TestCase):
1104 def test_sign(self):
1105 ann = {"key1": "value1"}
1106 sk_s,vk_s = keyutil.make_keypair()
1107 sk,ignored = keyutil.parse_privkey(sk_s)
1108 ann_t = sign_to_foolscap(ann, sk)
1109 (msg, sig, key) = ann_t
1110 self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
1111 self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
1112 self.failUnless(sig.startswith("v0-"))
1113 self.failUnless(key.startswith("v0-"))
1114 (ann2,key2) = unsign_from_foolscap(ann_t)
1115 self.failUnlessEqual(ann2, ann)
1116 self.failUnlessEqual("pub-"+key2, vk_s)
1119 bad_ann = {"key1": "value2"}
1120 bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
1121 self.failUnlessRaises(keyutil.BadSignatureError,
1122 unsign_from_foolscap, (bad_msg,sig,key))
1123 # sneaky bad signature should be ignored
1124 (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
1125 self.failUnlessEqual(key2, None)
1126 self.failUnlessEqual(ann2, bad_ann)
1128 # unrecognized signatures
1129 self.failUnlessRaises(UnknownKeyError,
1130 unsign_from_foolscap, (bad_msg,"v999-sig",key))
1131 self.failUnlessRaises(UnknownKeyError,
1132 unsign_from_foolscap, (bad_msg,sig,"v999-key"))
1135 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
1136 # should leave the Reconnector in place, also if it receives
1137 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
1138 # should tear down the Reconnector and make a new one. This behavior used to
1139 # live in the IntroducerClient, and thus used to be tested by test_introducer
1141 # copying more tests from old branch:
1143 # then also add Upgrade test