]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
introducer.client: use integer seqnums, not time-based. Closes #1767.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 import os, re, itertools
3 from base64 import b32decode
4 import simplejson
5
6 from twisted.trial import unittest
7 from twisted.internet import defer, address
8 from twisted.python import log
9
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14      WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17      get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
18      UnknownKeyError
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.web import introweb
23 from allmydata.client import Client as TahoeClient
24 from allmydata.util import pollmixin, keyutil, idlib, fileutil
25 import allmydata.test.common_util as testutil
26
27 class LoggingMultiService(service.MultiService):
28     def log(self, msg, **kw):
29         log.msg(msg, **kw)
30
31 class Node(testutil.SignalMixin, unittest.TestCase):
32     def test_loadable(self):
33         basedir = "introducer.IntroducerNode.test_loadable"
34         os.mkdir(basedir)
35         q = IntroducerNode(basedir)
36         d = fireEventually(None)
37         d.addCallback(lambda res: q.startService())
38         d.addCallback(lambda res: q.when_tub_ready())
39         d.addCallback(lambda res: q.stopService())
40         d.addCallback(flushEventualQueue)
41         return d
42
43 class ServiceMixin:
44     def setUp(self):
45         self.parent = LoggingMultiService()
46         self.parent.startService()
47     def tearDown(self):
48         log.msg("TestIntroducer.tearDown")
49         d = defer.succeed(None)
50         d.addCallback(lambda res: self.parent.stopService())
51         d.addCallback(flushEventualQueue)
52         return d
53
54 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
55
56     def test_create(self):
57         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
58                               "my_version", "oldest_version", {}, fakeseq)
59         self.failUnless(isinstance(ic, IntroducerClient))
60
61     def test_listen(self):
62         i = IntroducerService()
63         i.setServiceParent(self.parent)
64
65     def test_duplicate_publish(self):
66         i = IntroducerService()
67         self.failUnlessEqual(len(i.get_announcements()), 0)
68         self.failUnlessEqual(len(i.get_subscribers()), 0)
69         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
70         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
71         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
72         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
73         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
74         i.remote_publish(ann1)
75         self.failUnlessEqual(len(i.get_announcements()), 1)
76         self.failUnlessEqual(len(i.get_subscribers()), 0)
77         i.remote_publish(ann2)
78         self.failUnlessEqual(len(i.get_announcements()), 2)
79         self.failUnlessEqual(len(i.get_subscribers()), 0)
80         i.remote_publish(ann1b)
81         self.failUnlessEqual(len(i.get_announcements()), 2)
82         self.failUnlessEqual(len(i.get_subscribers()), 0)
83
84     def test_id_collision(self):
85         # test replacement case where tubid equals a keyid (one should
86         # not replace the other)
87         i = IntroducerService()
88         ic = IntroducerClient(None,
89                               "introducer.furl", u"my_nickname",
90                               "my_version", "oldest_version", {}, fakeseq)
91         sk_s, vk_s = keyutil.make_keypair()
92         sk, _ignored = keyutil.parse_privkey(sk_s)
93         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
94         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
95         ann_t = make_ann_t(ic, furl1, sk, 1)
96         i.remote_publish_v2(ann_t, Referenceable())
97         announcements = i.get_announcements()
98         self.failUnlessEqual(len(announcements), 1)
99         key1 = ("storage", "v0-"+keyid, None)
100         self.failUnlessEqual(announcements[0].index, key1)
101         ann1_out = announcements[0].announcement
102         self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
103
104         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
105         ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
106         i.remote_publish(ann2)
107         announcements = i.get_announcements()
108         self.failUnlessEqual(len(announcements), 2)
109         key2 = ("storage", None, keyid)
110         wanted = [ad for ad in announcements if ad.index == key2]
111         self.failUnlessEqual(len(wanted), 1)
112         ann2_out = wanted[0].announcement
113         self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
114
115
116 def fakeseq():
117     return 1, "nonce"
118
119 seqnum_counter = itertools.count(1)
120 def realseq():
121     return seqnum_counter.next(), str(os.randint(1,100000))
122
123 def make_ann(furl):
124     ann = { "anonymous-storage-FURL": furl,
125             "permutation-seed-base32": get_tubid_string(furl) }
126     return ann
127
128 def make_ann_t(ic, furl, privkey, seqnum):
129     ann_d = ic.create_announcement_dict("storage", make_ann(furl))
130     ann_d["seqnum"] = seqnum
131     ann_d["nonce"] = "nonce"
132     ann_t = sign_to_foolscap(ann_d, privkey)
133     return ann_t
134
135 class Client(unittest.TestCase):
136     def test_duplicate_receive_v1(self):
137         ic = IntroducerClient(None,
138                               "introducer.furl", u"my_nickname",
139                               "my_version", "oldest_version", {}, fakeseq)
140         announcements = []
141         ic.subscribe_to("storage",
142                         lambda key_s,ann: announcements.append(ann))
143         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
144         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
145         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
146         ca = WrapV2ClientInV1Interface(ic)
147
148         ca.remote_announce([ann1])
149         d = fireEventually()
150         def _then(ign):
151             self.failUnlessEqual(len(announcements), 1)
152             self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
153             self.failUnlessEqual(announcements[0]["my-version"], "ver23")
154             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
155             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
156             self.failUnlessEqual(ic._debug_counts["update"], 0)
157             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
158             # now send a duplicate announcement: this should not notify clients
159             ca.remote_announce([ann1])
160             return fireEventually()
161         d.addCallback(_then)
162         def _then2(ign):
163             self.failUnlessEqual(len(announcements), 1)
164             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
165             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
166             self.failUnlessEqual(ic._debug_counts["update"], 0)
167             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
168             # and a replacement announcement: same FURL, new other stuff.
169             # Clients should be notified.
170             ca.remote_announce([ann1b])
171             return fireEventually()
172         d.addCallback(_then2)
173         def _then3(ign):
174             self.failUnlessEqual(len(announcements), 2)
175             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
176             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
177             self.failUnlessEqual(ic._debug_counts["update"], 1)
178             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
179             # test that the other stuff changed
180             self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
181             self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
182         d.addCallback(_then3)
183         return d
184
185     def test_duplicate_receive_v2(self):
186         ic1 = IntroducerClient(None,
187                                "introducer.furl", u"my_nickname",
188                                "ver23", "oldest_version", {}, fakeseq)
189         # we use a second client just to create a different-looking
190         # announcement
191         ic2 = IntroducerClient(None,
192                                "introducer.furl", u"my_nickname",
193                                "ver24","oldest_version",{}, fakeseq)
194         announcements = []
195         def _received(key_s, ann):
196             announcements.append( (key_s, ann) )
197         ic1.subscribe_to("storage", _received)
198         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
199         furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
200         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
201
202         privkey_s, pubkey_vs = keyutil.make_keypair()
203         privkey, _ignored = keyutil.parse_privkey(privkey_s)
204         pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
205
206         # ann1: ic1, furl1
207         # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
208         # ann1b: ic2, furl1
209         # ann2: ic2, furl2
210
211         self.ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
212         self.ann1old = make_ann_t(ic1, furl1, privkey, seqnum=9)
213         self.ann1noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
214         self.ann1b = make_ann_t(ic2, furl1, privkey, seqnum=11)
215         self.ann1a = make_ann_t(ic1, furl1a, privkey, seqnum=12)
216         self.ann2 = make_ann_t(ic2, furl2, privkey, seqnum=13)
217
218         ic1.remote_announce_v2([self.ann1]) # queues eventual-send
219         d = fireEventually()
220         def _then1(ign):
221             self.failUnlessEqual(len(announcements), 1)
222             key_s,ann = announcements[0]
223             self.failUnlessEqual(key_s, pubkey_s)
224             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
225             self.failUnlessEqual(ann["my-version"], "ver23")
226         d.addCallback(_then1)
227
228         # now send a duplicate announcement. This should not fire the
229         # subscriber
230         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
231         d.addCallback(fireEventually)
232         def _then2(ign):
233             self.failUnlessEqual(len(announcements), 1)
234         d.addCallback(_then2)
235
236         # an older announcement shouldn't fire the subscriber either
237         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1old]))
238         d.addCallback(fireEventually)
239         def _then2a(ign):
240             self.failUnlessEqual(len(announcements), 1)
241         d.addCallback(_then2a)
242
243         # announcement with no seqnum cannot replace one with-seqnum
244         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1noseqnum]))
245         d.addCallback(fireEventually)
246         def _then2b(ign):
247             self.failUnlessEqual(len(announcements), 1)
248         d.addCallback(_then2b)
249
250         # and a replacement announcement: same FURL, new other stuff. The
251         # subscriber *should* be fired.
252         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
253         d.addCallback(fireEventually)
254         def _then3(ign):
255             self.failUnlessEqual(len(announcements), 2)
256             key_s,ann = announcements[-1]
257             self.failUnlessEqual(key_s, pubkey_s)
258             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
259             self.failUnlessEqual(ann["my-version"], "ver24")
260         d.addCallback(_then3)
261
262         # and a replacement announcement with a different FURL (it uses
263         # different connection hints)
264         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
265         d.addCallback(fireEventually)
266         def _then4(ign):
267             self.failUnlessEqual(len(announcements), 3)
268             key_s,ann = announcements[-1]
269             self.failUnlessEqual(key_s, pubkey_s)
270             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
271             self.failUnlessEqual(ann["my-version"], "ver23")
272         d.addCallback(_then4)
273
274         # now add a new subscription, which should be called with the
275         # backlog. The introducer only records one announcement per index, so
276         # the backlog will only have the latest message.
277         announcements2 = []
278         def _received2(key_s, ann):
279             announcements2.append( (key_s, ann) )
280         d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
281         d.addCallback(fireEventually)
282         def _then5(ign):
283             self.failUnlessEqual(len(announcements2), 1)
284             key_s,ann = announcements2[-1]
285             self.failUnlessEqual(key_s, pubkey_s)
286             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
287             self.failUnlessEqual(ann["my-version"], "ver23")
288         d.addCallback(_then5)
289         return d
290
291     def test_id_collision(self):
292         # test replacement case where tubid equals a keyid (one should
293         # not replace the other)
294         ic = IntroducerClient(None,
295                               "introducer.furl", u"my_nickname",
296                               "my_version", "oldest_version", {}, fakeseq)
297         announcements = []
298         ic.subscribe_to("storage",
299                         lambda key_s,ann: announcements.append(ann))
300         sk_s, vk_s = keyutil.make_keypair()
301         sk, _ignored = keyutil.parse_privkey(sk_s)
302         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
303         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
304         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
305         ann_t = make_ann_t(ic, furl1, sk, 1)
306         ic.remote_announce_v2([ann_t])
307         d = fireEventually()
308         def _then(ign):
309             # first announcement has been processed
310             self.failUnlessEqual(len(announcements), 1)
311             self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
312                                  furl1)
313             # now submit a second one, with a tubid that happens to look just
314             # like the pubkey-based serverid we just processed. They should
315             # not overlap.
316             ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
317             ca = WrapV2ClientInV1Interface(ic)
318             ca.remote_announce([ann2])
319             return fireEventually()
320         d.addCallback(_then)
321         def _then2(ign):
322             # if they overlapped, the second announcement would be ignored
323             self.failUnlessEqual(len(announcements), 2)
324             self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
325                                  furl2)
326         d.addCallback(_then2)
327         return d
328
329 class Server(unittest.TestCase):
330     def test_duplicate(self):
331         i = IntroducerService()
332         ic1 = IntroducerClient(None,
333                                "introducer.furl", u"my_nickname",
334                                "ver23", "oldest_version", {}, realseq)
335         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
336
337         privkey_s, _ = keyutil.make_keypair()
338         privkey, _ = keyutil.parse_privkey(privkey_s)
339
340         ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
341         ann1_old = make_ann_t(ic1, furl1, privkey, seqnum=9)
342         ann1_new = make_ann_t(ic1, furl1, privkey, seqnum=11)
343         ann1_noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
344         ann1_badseqnum = make_ann_t(ic1, furl1, privkey, seqnum="not an int")
345
346         i.remote_publish_v2(ann1, None)
347         all = i.get_announcements()
348         self.failUnlessEqual(len(all), 1)
349         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
350         self.failUnlessEqual(i._debug_counts["inbound_message"], 1)
351         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 0)
352         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
353         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
354         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
355
356         i.remote_publish_v2(ann1, None)
357         all = i.get_announcements()
358         self.failUnlessEqual(len(all), 1)
359         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
360         self.failUnlessEqual(i._debug_counts["inbound_message"], 2)
361         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
362         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
363         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
364         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
365
366         i.remote_publish_v2(ann1_old, None)
367         all = i.get_announcements()
368         self.failUnlessEqual(len(all), 1)
369         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
370         self.failUnlessEqual(i._debug_counts["inbound_message"], 3)
371         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
372         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
373         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
374         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
375
376         i.remote_publish_v2(ann1_new, None)
377         all = i.get_announcements()
378         self.failUnlessEqual(len(all), 1)
379         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
380         self.failUnlessEqual(i._debug_counts["inbound_message"], 4)
381         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
382         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
383         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
384         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
385
386         i.remote_publish_v2(ann1_noseqnum, None)
387         all = i.get_announcements()
388         self.failUnlessEqual(len(all), 1)
389         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
390         self.failUnlessEqual(i._debug_counts["inbound_message"], 5)
391         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
392         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 1)
393         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
394         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
395
396         i.remote_publish_v2(ann1_badseqnum, None)
397         all = i.get_announcements()
398         self.failUnlessEqual(len(all), 1)
399         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
400         self.failUnlessEqual(i._debug_counts["inbound_message"], 6)
401         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
402         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 2)
403         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
404         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
405
406
407 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
408
409 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
410
411     def create_tub(self, portnum=0):
412         tubfile = os.path.join(self.basedir, "tub.pem")
413         self.central_tub = tub = Tub(certFile=tubfile)
414         #tub.setOption("logLocalFailures", True)
415         #tub.setOption("logRemoteFailures", True)
416         tub.setOption("expose-remote-exception-types", False)
417         tub.setServiceParent(self.parent)
418         l = tub.listenOn("tcp:%d" % portnum)
419         self.central_portnum = l.getPortnum()
420         if portnum != 0:
421             assert self.central_portnum == portnum
422         tub.setLocation("localhost:%d" % self.central_portnum)
423
424 class Queue(SystemTestMixin, unittest.TestCase):
425     def test_queue_until_connected(self):
426         self.basedir = "introducer/QueueUntilConnected/queued"
427         os.makedirs(self.basedir)
428         self.create_tub()
429         introducer = IntroducerService()
430         introducer.setServiceParent(self.parent)
431         iff = os.path.join(self.basedir, "introducer.furl")
432         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
433         tub2 = Tub()
434         tub2.setServiceParent(self.parent)
435         c = IntroducerClient(tub2, ifurl,
436                              u"nickname", "version", "oldest", {}, fakeseq)
437         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
438         sk_s, vk_s = keyutil.make_keypair()
439         sk, _ignored = keyutil.parse_privkey(sk_s)
440
441         d = introducer.disownServiceParent()
442         def _offline(ign):
443             # now that the introducer server is offline, create a client and
444             # publish some messages
445             c.setServiceParent(self.parent) # this starts the reconnector
446             c.publish("storage", make_ann(furl1), sk)
447
448             introducer.setServiceParent(self.parent) # restart the server
449             # now wait for the messages to be delivered
450             def _got_announcement():
451                 return bool(introducer.get_announcements())
452             return self.poll(_got_announcement)
453         d.addCallback(_offline)
454         def _done(ign):
455             v = introducer.get_announcements()[0]
456             furl = v.announcement["anonymous-storage-FURL"]
457             self.failUnlessEqual(furl, furl1)
458         d.addCallback(_done)
459
460         # now let the ack get back
461         def _wait_until_idle(ign):
462             def _idle():
463                 if c._debug_outstanding:
464                     return False
465                 if introducer._debug_outstanding:
466                     return False
467                 return True
468             return self.poll(_idle)
469         d.addCallback(_wait_until_idle)
470         return d
471
472
473 V1 = "v1"; V2 = "v2"
474 class SystemTest(SystemTestMixin, unittest.TestCase):
475
476     def do_system_test(self, server_version):
477         self.create_tub()
478         if server_version == V1:
479             introducer = old.IntroducerService_v1()
480         else:
481             introducer = IntroducerService()
482         introducer.setServiceParent(self.parent)
483         iff = os.path.join(self.basedir, "introducer.furl")
484         tub = self.central_tub
485         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
486         self.introducer_furl = ifurl
487
488         # we have 5 clients who publish themselves as storage servers, and a
489         # sixth which does which not. All 6 clients subscriber to hear about
490         # storage. When the connections are fully established, all six nodes
491         # should have 5 connections each.
492         NUM_STORAGE = 5
493         NUM_CLIENTS = 6
494
495         clients = []
496         tubs = {}
497         received_announcements = {}
498         subscribing_clients = []
499         publishing_clients = []
500         printable_serverids = {}
501         self.the_introducer = introducer
502         privkeys = {}
503         expected_announcements = [0 for c in range(NUM_CLIENTS)]
504
505         for i in range(NUM_CLIENTS):
506             tub = Tub()
507             #tub.setOption("logLocalFailures", True)
508             #tub.setOption("logRemoteFailures", True)
509             tub.setOption("expose-remote-exception-types", False)
510             tub.setServiceParent(self.parent)
511             l = tub.listenOn("tcp:0")
512             portnum = l.getPortnum()
513             tub.setLocation("localhost:%d" % portnum)
514
515             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
516             if i == 0:
517                 c = old.IntroducerClient_v1(tub, self.introducer_furl,
518                                             NICKNAME % str(i),
519                                             "version", "oldest")
520             else:
521                 c = IntroducerClient(tub, self.introducer_furl,
522                                      NICKNAME % str(i),
523                                      "version", "oldest",
524                                      {"component": "component-v1"}, fakeseq)
525             received_announcements[c] = {}
526             def got(key_s_or_tubid, ann, announcements, i):
527                 if i == 0:
528                     index = get_tubid_string_from_ann(ann)
529                 else:
530                     index = key_s_or_tubid or get_tubid_string_from_ann(ann)
531                 announcements[index] = ann
532             c.subscribe_to("storage", got, received_announcements[c], i)
533             subscribing_clients.append(c)
534             expected_announcements[i] += 1 # all expect a 'storage' announcement
535
536             node_furl = tub.registerReference(Referenceable())
537             if i < NUM_STORAGE:
538                 if i == 0:
539                     c.publish(node_furl, "storage", "ri_name")
540                     printable_serverids[i] = get_tubid_string(node_furl)
541                 elif i == 1:
542                     # sign the announcement
543                     privkey_s, pubkey_s = keyutil.make_keypair()
544                     privkey, _ignored = keyutil.parse_privkey(privkey_s)
545                     privkeys[c] = privkey
546                     c.publish("storage", make_ann(node_furl), privkey)
547                     if server_version == V1:
548                         printable_serverids[i] = get_tubid_string(node_furl)
549                     else:
550                         assert pubkey_s.startswith("pub-")
551                         printable_serverids[i] = pubkey_s[len("pub-"):]
552                 else:
553                     c.publish("storage", make_ann(node_furl))
554                     printable_serverids[i] = get_tubid_string(node_furl)
555                 publishing_clients.append(c)
556             else:
557                 # the last one does not publish anything
558                 pass
559
560             if i == 0:
561                 # users of the V1 client were required to publish a
562                 # 'stub_client' record (somewhat after they published the
563                 # 'storage' record), so the introducer could see their
564                 # version. Match that behavior.
565                 c.publish(node_furl, "stub_client", "stub_ri_name")
566
567             if i == 2:
568                 # also publish something that nobody cares about
569                 boring_furl = tub.registerReference(Referenceable())
570                 c.publish("boring", make_ann(boring_furl))
571
572             c.setServiceParent(self.parent)
573             clients.append(c)
574             tubs[c] = tub
575
576
577         def _wait_for_connected(ign):
578             def _connected():
579                 for c in clients:
580                     if not c.connected_to_introducer():
581                         return False
582                 return True
583             return self.poll(_connected)
584
585         # we watch the clients to determine when the system has settled down.
586         # Then we can look inside the server to assert things about its
587         # state.
588
589         def _wait_for_expected_announcements(ign):
590             def _got_expected_announcements():
591                 for i,c in enumerate(subscribing_clients):
592                     if len(received_announcements[c]) < expected_announcements[i]:
593                         return False
594                 return True
595             return self.poll(_got_expected_announcements)
596
597         # before shutting down any Tub, we'd like to know that there are no
598         # messages outstanding
599
600         def _wait_until_idle(ign):
601             def _idle():
602                 for c in subscribing_clients + publishing_clients:
603                     if c._debug_outstanding:
604                         return False
605                 if self.the_introducer._debug_outstanding:
606                     return False
607                 return True
608             return self.poll(_idle)
609
610         d = defer.succeed(None)
611         d.addCallback(_wait_for_connected)
612         d.addCallback(_wait_for_expected_announcements)
613         d.addCallback(_wait_until_idle)
614
615         def _check1(res):
616             log.msg("doing _check1")
617             dc = self.the_introducer._debug_counts
618             if server_version == V1:
619                 # each storage server publishes a record, and (after its
620                 # 'subscribe' has been ACKed) also publishes a "stub_client".
621                 # The non-storage client (which subscribes) also publishes a
622                 # stub_client. There is also one "boring" service. The number
623                 # of messages is higher, because the stub_clients aren't
624                 # published until after we get the 'subscribe' ack (since we
625                 # don't realize that we're dealing with a v1 server [which
626                 # needs stub_clients] until then), and the act of publishing
627                 # the stub_client causes us to re-send all previous
628                 # announcements.
629                 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
630                                      NUM_STORAGE + NUM_CLIENTS + 1)
631             else:
632                 # each storage server publishes a record. There is also one
633                 # "stub_client" and one "boring"
634                 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
635                 self.failUnlessEqual(dc["inbound_duplicate"], 0)
636             self.failUnlessEqual(dc["inbound_update"], 0)
637             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
638             # the number of outbound messages is tricky.. I think it depends
639             # upon a race between the publish and the subscribe messages.
640             self.failUnless(dc["outbound_message"] > 0)
641             # each client subscribes to "storage", and each server publishes
642             self.failUnlessEqual(dc["outbound_announcements"],
643                                  NUM_STORAGE*NUM_CLIENTS)
644
645             for c in subscribing_clients:
646                 cdc = c._debug_counts
647                 self.failUnless(cdc["inbound_message"])
648                 self.failUnlessEqual(cdc["inbound_announcement"],
649                                      NUM_STORAGE)
650                 self.failUnlessEqual(cdc["wrong_service"], 0)
651                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
652                 self.failUnlessEqual(cdc["update"], 0)
653                 self.failUnlessEqual(cdc["new_announcement"],
654                                      NUM_STORAGE)
655                 anns = received_announcements[c]
656                 self.failUnlessEqual(len(anns), NUM_STORAGE)
657
658                 nodeid0 = tubs[clients[0]].tubID
659                 ann = anns[nodeid0]
660                 nick = ann["nickname"]
661                 self.failUnlessEqual(type(nick), unicode)
662                 self.failUnlessEqual(nick, NICKNAME % "0")
663             if server_version == V1:
664                 for c in publishing_clients:
665                     cdc = c._debug_counts
666                     expected = 1 # storage
667                     if c is clients[2]:
668                         expected += 1 # boring
669                     if c is not clients[0]:
670                         # the v2 client tries to call publish_v2, which fails
671                         # because the server is v1. It then re-sends
672                         # everything it has so far, plus a stub_client record
673                         expected = 2*expected + 1
674                     if c is clients[0]:
675                         # we always tell v1 client to send stub_client
676                         expected += 1
677                     self.failUnlessEqual(cdc["outbound_message"], expected)
678             else:
679                 for c in publishing_clients:
680                     cdc = c._debug_counts
681                     expected = 1
682                     if c in [clients[0], # stub_client
683                              clients[2], # boring
684                              ]:
685                         expected = 2
686                     self.failUnlessEqual(cdc["outbound_message"], expected)
687             # now check the web status, make sure it renders without error
688             ir = introweb.IntroducerRoot(self.parent)
689             self.parent.nodeid = "NODEID"
690             text = ir.renderSynchronously().decode("utf-8")
691             self.failUnlessIn(NICKNAME % "0", text) # the v1 client
692             self.failUnlessIn(NICKNAME % "1", text) # a v2 client
693             for i in range(NUM_STORAGE):
694                 self.failUnlessIn(printable_serverids[i], text,
695                                   (i,printable_serverids[i],text))
696                 # make sure there isn't a double-base32ed string too
697                 self.failIfIn(idlib.nodeid_b2a(printable_serverids[i]), text,
698                               (i,printable_serverids[i],text))
699             log.msg("_check1 done")
700         d.addCallback(_check1)
701
702         # force an introducer reconnect, by shutting down the Tub it's using
703         # and starting a new Tub (with the old introducer). Everybody should
704         # reconnect and republish, but the introducer should ignore the
705         # republishes as duplicates. However, because the server doesn't know
706         # what each client does and does not know, it will send them a copy
707         # of the current announcement table anyway.
708
709         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
710         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
711
712         def _wait_for_introducer_loss(ign):
713             def _introducer_lost():
714                 for c in clients:
715                     if c.connected_to_introducer():
716                         return False
717                 return True
718             return self.poll(_introducer_lost)
719         d.addCallback(_wait_for_introducer_loss)
720
721         def _restart_introducer_tub(_ign):
722             log.msg("restarting introducer's Tub")
723             # reset counters
724             for i in range(NUM_CLIENTS):
725                 c = subscribing_clients[i]
726                 for k in c._debug_counts:
727                     c._debug_counts[k] = 0
728             for k in self.the_introducer._debug_counts:
729                 self.the_introducer._debug_counts[k] = 0
730             expected_announcements[i] += 1 # new 'storage' for everyone
731             self.create_tub(self.central_portnum)
732             newfurl = self.central_tub.registerReference(self.the_introducer,
733                                                          furlFile=iff)
734             assert newfurl == self.introducer_furl
735         d.addCallback(_restart_introducer_tub)
736
737         d.addCallback(_wait_for_connected)
738         d.addCallback(_wait_for_expected_announcements)
739         d.addCallback(_wait_until_idle)
740         d.addCallback(lambda _ign: log.msg(" reconnected"))
741
742         # TODO: publish something while the introducer is offline, then
743         # confirm it gets delivered when the connection is reestablished
744         def _check2(res):
745             log.msg("doing _check2")
746             # assert that the introducer sent out new messages, one per
747             # subscriber
748             dc = self.the_introducer._debug_counts
749             self.failUnlessEqual(dc["outbound_announcements"],
750                                  NUM_STORAGE*NUM_CLIENTS)
751             self.failUnless(dc["outbound_message"] > 0)
752             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
753             for c in subscribing_clients:
754                 cdc = c._debug_counts
755                 self.failUnlessEqual(cdc["inbound_message"], 1)
756                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
757                 self.failUnlessEqual(cdc["new_announcement"], 0)
758                 self.failUnlessEqual(cdc["wrong_service"], 0)
759                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
760         d.addCallback(_check2)
761
762         # Then force an introducer restart, by shutting down the Tub,
763         # destroying the old introducer, and starting a new Tub+Introducer.
764         # Everybody should reconnect and republish, and the (new) introducer
765         # will distribute the new announcements, but the clients should
766         # ignore the republishes as duplicates.
767
768         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
769         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
770         d.addCallback(_wait_for_introducer_loss)
771         d.addCallback(lambda _ign: log.msg("introducer lost"))
772
773         def _restart_introducer(_ign):
774             log.msg("restarting introducer")
775             self.create_tub(self.central_portnum)
776             # reset counters
777             for i in range(NUM_CLIENTS):
778                 c = subscribing_clients[i]
779                 for k in c._debug_counts:
780                     c._debug_counts[k] = 0
781             expected_announcements[i] += 1 # new 'storage' for everyone
782             if server_version == V1:
783                 introducer = old.IntroducerService_v1()
784             else:
785                 introducer = IntroducerService()
786             self.the_introducer = introducer
787             newfurl = self.central_tub.registerReference(self.the_introducer,
788                                                          furlFile=iff)
789             assert newfurl == self.introducer_furl
790         d.addCallback(_restart_introducer)
791
792         d.addCallback(_wait_for_connected)
793         d.addCallback(_wait_for_expected_announcements)
794         d.addCallback(_wait_until_idle)
795
796         def _check3(res):
797             log.msg("doing _check3")
798             dc = self.the_introducer._debug_counts
799             self.failUnlessEqual(dc["outbound_announcements"],
800                                  NUM_STORAGE*NUM_CLIENTS)
801             self.failUnless(dc["outbound_message"] > 0)
802             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
803             for c in subscribing_clients:
804                 cdc = c._debug_counts
805                 self.failUnless(cdc["inbound_message"] > 0)
806                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
807                 self.failUnlessEqual(cdc["new_announcement"], 0)
808                 self.failUnlessEqual(cdc["wrong_service"], 0)
809                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
810
811         d.addCallback(_check3)
812         return d
813
814
815     def test_system_v2_server(self):
816         self.basedir = "introducer/SystemTest/system_v2_server"
817         os.makedirs(self.basedir)
818         return self.do_system_test(V2)
819     test_system_v2_server.timeout = 480
820     # occasionally takes longer than 350s on "draco"
821
822     def test_system_v1_server(self):
823         self.basedir = "introducer/SystemTest/system_v1_server"
824         os.makedirs(self.basedir)
825         return self.do_system_test(V1)
826     test_system_v1_server.timeout = 480
827     # occasionally takes longer than 350s on "draco"
828
829 class FakeRemoteReference:
830     def notifyOnDisconnect(self, *args, **kwargs): pass
831     def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
832     def getLocationHints(self): return [("ipv4", "here.example.com", "1234"),
833                                         ("ipv4", "there.example.com", "2345")]
834     def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
835                                                   3456)
836
837 class ClientInfo(unittest.TestCase):
838     def test_client_v2(self):
839         introducer = IntroducerService()
840         tub = introducer_furl = None
841         app_versions = {"whizzy": "fizzy"}
842         client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
843                                      "my_version", "oldest", app_versions,
844                                      fakeseq)
845         #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
846         #ann_s = make_ann_t(client_v2, furl1, None, 10)
847         #introducer.remote_publish_v2(ann_s, Referenceable())
848         subscriber = FakeRemoteReference()
849         introducer.remote_subscribe_v2(subscriber, "storage",
850                                        client_v2._my_subscriber_info)
851         subs = introducer.get_subscribers()
852         self.failUnlessEqual(len(subs), 1)
853         s0 = subs[0]
854         self.failUnlessEqual(s0.service_name, "storage")
855         self.failUnlessEqual(s0.app_versions, app_versions)
856         self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
857         self.failUnlessEqual(s0.version, "my_version")
858
859     def test_client_v1(self):
860         introducer = IntroducerService()
861         subscriber = FakeRemoteReference()
862         introducer.remote_subscribe(subscriber, "storage")
863         # the v1 subscribe interface had no subscriber_info: that was usually
864         # sent in a separate stub_client pseudo-announcement
865         subs = introducer.get_subscribers()
866         self.failUnlessEqual(len(subs), 1)
867         s0 = subs[0]
868         self.failUnlessEqual(s0.nickname, u"?") # not known yet
869         self.failUnlessEqual(s0.service_name, "storage")
870
871         # now submit the stub_client announcement
872         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
873         ann = (furl1, "stub_client", "RIStubClient",
874                (NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
875         introducer.remote_publish(ann)
876         # the server should correlate the two
877         subs = introducer.get_subscribers()
878         self.failUnlessEqual(len(subs), 1)
879         s0 = subs[0]
880         self.failUnlessEqual(s0.service_name, "storage")
881         # v1 announcements do not contain app-versions
882         self.failUnlessEqual(s0.app_versions, {})
883         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
884         self.failUnlessEqual(s0.version, "my_version")
885
886         # a subscription that arrives after the stub_client announcement
887         # should be correlated too
888         subscriber2 = FakeRemoteReference()
889         introducer.remote_subscribe(subscriber2, "thing2")
890
891         subs = introducer.get_subscribers()
892         self.failUnlessEqual(len(subs), 2)
893         s0 = [s for s in subs if s.service_name == "thing2"][0]
894         # v1 announcements do not contain app-versions
895         self.failUnlessEqual(s0.app_versions, {})
896         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
897         self.failUnlessEqual(s0.version, "my_version")
898
899 class Announcements(unittest.TestCase):
900     def test_client_v2_unsigned(self):
901         introducer = IntroducerService()
902         tub = introducer_furl = None
903         app_versions = {"whizzy": "fizzy"}
904         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
905                                      "my_version", "oldest", app_versions,
906                                      fakeseq)
907         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
908         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
909         ann_s0 = make_ann_t(client_v2, furl1, None, 10)
910         canary0 = Referenceable()
911         introducer.remote_publish_v2(ann_s0, canary0)
912         a = introducer.get_announcements()
913         self.failUnlessEqual(len(a), 1)
914         self.failUnlessIdentical(a[0].canary, canary0)
915         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
916         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
917         self.failUnlessEqual(a[0].nickname, u"nick-v2")
918         self.failUnlessEqual(a[0].service_name, "storage")
919         self.failUnlessEqual(a[0].version, "my_version")
920         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
921
922     def test_client_v2_signed(self):
923         introducer = IntroducerService()
924         tub = introducer_furl = None
925         app_versions = {"whizzy": "fizzy"}
926         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
927                                      "my_version", "oldest", app_versions,
928                                      fakeseq)
929         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
930         sk_s, vk_s = keyutil.make_keypair()
931         sk, _ignored = keyutil.parse_privkey(sk_s)
932         pks = keyutil.remove_prefix(vk_s, "pub-")
933         ann_t0 = make_ann_t(client_v2, furl1, sk, 10)
934         canary0 = Referenceable()
935         introducer.remote_publish_v2(ann_t0, canary0)
936         a = introducer.get_announcements()
937         self.failUnlessEqual(len(a), 1)
938         self.failUnlessIdentical(a[0].canary, canary0)
939         self.failUnlessEqual(a[0].index, ("storage", pks, None))
940         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
941         self.failUnlessEqual(a[0].nickname, u"nick-v2")
942         self.failUnlessEqual(a[0].service_name, "storage")
943         self.failUnlessEqual(a[0].version, "my_version")
944         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
945
946     def test_client_v1(self):
947         introducer = IntroducerService()
948
949         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
950         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
951         ann = (furl1, "storage", "RIStorage",
952                u"nick-v1".encode("utf-8"), "my_version", "oldest")
953         introducer.remote_publish(ann)
954
955         a = introducer.get_announcements()
956         self.failUnlessEqual(len(a), 1)
957         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
958         self.failUnlessEqual(a[0].canary, None)
959         self.failUnlessEqual(a[0].announcement["app-versions"], {})
960         self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
961         self.failUnlessEqual(a[0].service_name, "storage")
962         self.failUnlessEqual(a[0].version, "my_version")
963         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
964
965 class ClientSeqnums(unittest.TestCase):
966     def test_client(self):
967         basedir = "introducer/ClientSeqnums/test_client"
968         fileutil.make_dirs(basedir)
969         f = open(os.path.join(basedir, "tahoe.cfg"), "w")
970         f.write("[client]\n")
971         f.write("introducer.furl = nope\n")
972         f.close()
973         c = TahoeClient(basedir)
974         ic = c.introducer_client
975         outbound = ic._outbound_announcements
976         published = ic._published_announcements
977         def read_seqnum():
978             f = open(os.path.join(basedir, "announcement-seqnum"))
979             seqnum = f.read().strip()
980             f.close()
981             return int(seqnum)
982
983         ic.publish("sA", {"key": "value1"}, c._server_key)
984         self.failUnlessEqual(read_seqnum(), 1)
985         self.failUnless("sA" in outbound)
986         self.failUnlessEqual(outbound["sA"]["seqnum"], 1)
987         nonce1 = outbound["sA"]["nonce"]
988         self.failUnless(isinstance(nonce1, str))
989         self.failUnlessEqual(simplejson.loads(published["sA"][0]),
990                              outbound["sA"])
991         # [1] is the signature, [2] is the pubkey
992
993         # publishing a second service causes both services to be
994         # re-published, with the next higher sequence number
995         ic.publish("sB", {"key": "value2"}, c._server_key)
996         self.failUnlessEqual(read_seqnum(), 2)
997         self.failUnless("sB" in outbound)
998         self.failUnlessEqual(outbound["sB"]["seqnum"], 2)
999         self.failUnless("sA" in outbound)
1000         self.failUnlessEqual(outbound["sA"]["seqnum"], 2)
1001         nonce2 = outbound["sA"]["nonce"]
1002         self.failUnless(isinstance(nonce2, str))
1003         self.failIfEqual(nonce1, nonce2)
1004         self.failUnlessEqual(simplejson.loads(published["sA"][0]),
1005                              outbound["sA"])
1006         self.failUnlessEqual(simplejson.loads(published["sB"][0]),
1007                              outbound["sB"])
1008
1009
1010
1011 class TooNewServer(IntroducerService):
1012     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
1013                  { },
1014                 "application-version": "greetings from the crazy future",
1015                 }
1016
1017 class NonV1Server(SystemTestMixin, unittest.TestCase):
1018     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
1019     # protocol, it is supposed to provide a useful error instead of a weird
1020     # exception.
1021
1022     def test_failure(self):
1023         self.basedir = "introducer/NonV1Server/failure"
1024         os.makedirs(self.basedir)
1025         self.create_tub()
1026         i = TooNewServer()
1027         i.setServiceParent(self.parent)
1028         self.introducer_furl = self.central_tub.registerReference(i)
1029
1030         tub = Tub()
1031         tub.setOption("expose-remote-exception-types", False)
1032         tub.setServiceParent(self.parent)
1033         l = tub.listenOn("tcp:0")
1034         portnum = l.getPortnum()
1035         tub.setLocation("localhost:%d" % portnum)
1036
1037         c = IntroducerClient(tub, self.introducer_furl,
1038                              u"nickname-client", "version", "oldest", {},
1039                              fakeseq)
1040         announcements = {}
1041         def got(key_s, ann):
1042             announcements[key_s] = ann
1043         c.subscribe_to("storage", got)
1044
1045         c.setServiceParent(self.parent)
1046
1047         # now we wait for it to connect and notice the bad version
1048
1049         def _got_bad():
1050             return bool(c._introducer_error) or bool(c._publisher)
1051         d = self.poll(_got_bad)
1052         def _done(res):
1053             self.failUnless(c._introducer_error)
1054             self.failUnless(c._introducer_error.check(InsufficientVersionError),
1055                             c._introducer_error)
1056         d.addCallback(_done)
1057         return d
1058
1059 class DecodeFurl(unittest.TestCase):
1060     def test_decode(self):
1061         # make sure we have a working base64.b32decode. The one in
1062         # python2.4.[01] was broken.
1063         furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
1064         m = re.match(r'pb://(\w+)@', furl)
1065         assert m
1066         nodeid = b32decode(m.group(1).upper())
1067         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
1068
1069 class Signatures(unittest.TestCase):
1070     def test_sign(self):
1071         ann = {"key1": "value1"}
1072         sk_s,vk_s = keyutil.make_keypair()
1073         sk,ignored = keyutil.parse_privkey(sk_s)
1074         ann_t = sign_to_foolscap(ann, sk)
1075         (msg, sig, key) = ann_t
1076         self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
1077         self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
1078         self.failUnless(sig.startswith("v0-"))
1079         self.failUnless(key.startswith("v0-"))
1080         (ann2,key2) = unsign_from_foolscap(ann_t)
1081         self.failUnlessEqual(ann2, ann)
1082         self.failUnlessEqual("pub-"+key2, vk_s)
1083
1084         # bad signature
1085         bad_ann = {"key1": "value2"}
1086         bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
1087         self.failUnlessRaises(keyutil.BadSignatureError,
1088                               unsign_from_foolscap, (bad_msg,sig,key))
1089         # sneaky bad signature should be ignored
1090         (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
1091         self.failUnlessEqual(key2, None)
1092         self.failUnlessEqual(ann2, bad_ann)
1093
1094         # unrecognized signatures
1095         self.failUnlessRaises(UnknownKeyError,
1096                               unsign_from_foolscap, (bad_msg,"v999-sig",key))
1097         self.failUnlessRaises(UnknownKeyError,
1098                               unsign_from_foolscap, (bad_msg,sig,"v999-key"))
1099
1100
1101 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
1102 # should leave the Reconnector in place, also if it receives
1103 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
1104 # should tear down the Reconnector and make a new one. This behavior used to
1105 # live in the IntroducerClient, and thus used to be tested by test_introducer
1106
1107 # copying more tests from old branch:
1108
1109 #  then also add Upgrade test