]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
Make introducer.furl unguessable. Closes #1802.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 import os, re, itertools
3 from base64 import b32decode
4 import simplejson
5
6 from twisted.trial import unittest
7 from twisted.internet import defer, address
8 from twisted.python import log
9
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14      WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService, FurlFileConflictError
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17      get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
18      UnknownKeyError
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.web import introweb
23 from allmydata.client import Client as TahoeClient
24 from allmydata.util import pollmixin, keyutil, idlib, fileutil
25 import allmydata.test.common_util as testutil
26
27 class LoggingMultiService(service.MultiService):
28     def log(self, msg, **kw):
29         log.msg(msg, **kw)
30
31 class Node(testutil.SignalMixin, unittest.TestCase):
32     def test_furl(self):
33         basedir = "introducer.IntroducerNode.test_furl"
34         os.mkdir(basedir)
35         public_fn = os.path.join(basedir, "introducer.furl")
36         private_fn = os.path.join(basedir, "private", "introducer.furl")
37         q1 = IntroducerNode(basedir)
38         d = fireEventually(None)
39         d.addCallback(lambda res: q1.startService())
40         d.addCallback(lambda res: q1.when_tub_ready())
41         d.addCallback(lambda res: q1.stopService())
42         d.addCallback(flushEventualQueue)
43         def _check_furl(res):
44             # new nodes create unguessable furls in private/introducer.furl
45             ifurl = fileutil.read(private_fn)
46             self.failUnless(ifurl)
47             ifurl = ifurl.strip()
48             self.failIf(ifurl.endswith("/introducer"), ifurl)
49
50             # old nodes created guessable furls in BASEDIR/introducer.furl
51             guessable = ifurl[:ifurl.rfind("/")] + "/introducer"
52             fileutil.write(public_fn, guessable+"\n", mode="w") # text
53
54             # if we see both files, throw an error
55             self.failUnlessRaises(FurlFileConflictError,
56                                   IntroducerNode, basedir)
57
58             # when we see only the public one, move it to private/ and use
59             # the existing furl instead of creating a new one
60             os.unlink(private_fn)
61             q2 = IntroducerNode(basedir)
62             d2 = fireEventually(None)
63             d2.addCallback(lambda res: q2.startService())
64             d2.addCallback(lambda res: q2.when_tub_ready())
65             d2.addCallback(lambda res: q2.stopService())
66             d2.addCallback(flushEventualQueue)
67             def _check_furl2(res):
68                 self.failIf(os.path.exists(public_fn))
69                 ifurl2 = fileutil.read(private_fn)
70                 self.failUnless(ifurl2)
71                 self.failUnlessEqual(ifurl2.strip(), guessable)
72             d2.addCallback(_check_furl2)
73             return d2
74         d.addCallback(_check_furl)
75         return d
76
77 class ServiceMixin:
78     def setUp(self):
79         self.parent = LoggingMultiService()
80         self.parent.startService()
81     def tearDown(self):
82         log.msg("TestIntroducer.tearDown")
83         d = defer.succeed(None)
84         d.addCallback(lambda res: self.parent.stopService())
85         d.addCallback(flushEventualQueue)
86         return d
87
88 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
89
90     def test_create(self):
91         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
92                               "my_version", "oldest_version", {}, fakeseq)
93         self.failUnless(isinstance(ic, IntroducerClient))
94
95     def test_listen(self):
96         i = IntroducerService()
97         i.setServiceParent(self.parent)
98
99     def test_duplicate_publish(self):
100         i = IntroducerService()
101         self.failUnlessEqual(len(i.get_announcements()), 0)
102         self.failUnlessEqual(len(i.get_subscribers()), 0)
103         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
104         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
105         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
106         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
107         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
108         i.remote_publish(ann1)
109         self.failUnlessEqual(len(i.get_announcements()), 1)
110         self.failUnlessEqual(len(i.get_subscribers()), 0)
111         i.remote_publish(ann2)
112         self.failUnlessEqual(len(i.get_announcements()), 2)
113         self.failUnlessEqual(len(i.get_subscribers()), 0)
114         i.remote_publish(ann1b)
115         self.failUnlessEqual(len(i.get_announcements()), 2)
116         self.failUnlessEqual(len(i.get_subscribers()), 0)
117
118     def test_id_collision(self):
119         # test replacement case where tubid equals a keyid (one should
120         # not replace the other)
121         i = IntroducerService()
122         ic = IntroducerClient(None,
123                               "introducer.furl", u"my_nickname",
124                               "my_version", "oldest_version", {}, fakeseq)
125         sk_s, vk_s = keyutil.make_keypair()
126         sk, _ignored = keyutil.parse_privkey(sk_s)
127         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
128         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
129         ann_t = make_ann_t(ic, furl1, sk, 1)
130         i.remote_publish_v2(ann_t, Referenceable())
131         announcements = i.get_announcements()
132         self.failUnlessEqual(len(announcements), 1)
133         key1 = ("storage", "v0-"+keyid, None)
134         self.failUnlessEqual(announcements[0].index, key1)
135         ann1_out = announcements[0].announcement
136         self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
137
138         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
139         ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
140         i.remote_publish(ann2)
141         announcements = i.get_announcements()
142         self.failUnlessEqual(len(announcements), 2)
143         key2 = ("storage", None, keyid)
144         wanted = [ad for ad in announcements if ad.index == key2]
145         self.failUnlessEqual(len(wanted), 1)
146         ann2_out = wanted[0].announcement
147         self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
148
149
150 def fakeseq():
151     return 1, "nonce"
152
153 seqnum_counter = itertools.count(1)
154 def realseq():
155     return seqnum_counter.next(), str(os.randint(1,100000))
156
157 def make_ann(furl):
158     ann = { "anonymous-storage-FURL": furl,
159             "permutation-seed-base32": get_tubid_string(furl) }
160     return ann
161
162 def make_ann_t(ic, furl, privkey, seqnum):
163     ann_d = ic.create_announcement_dict("storage", make_ann(furl))
164     ann_d["seqnum"] = seqnum
165     ann_d["nonce"] = "nonce"
166     ann_t = sign_to_foolscap(ann_d, privkey)
167     return ann_t
168
169 class Client(unittest.TestCase):
170     def test_duplicate_receive_v1(self):
171         ic = IntroducerClient(None,
172                               "introducer.furl", u"my_nickname",
173                               "my_version", "oldest_version", {}, fakeseq)
174         announcements = []
175         ic.subscribe_to("storage",
176                         lambda key_s,ann: announcements.append(ann))
177         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
178         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
179         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
180         ca = WrapV2ClientInV1Interface(ic)
181
182         ca.remote_announce([ann1])
183         d = fireEventually()
184         def _then(ign):
185             self.failUnlessEqual(len(announcements), 1)
186             self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
187             self.failUnlessEqual(announcements[0]["my-version"], "ver23")
188             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
189             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
190             self.failUnlessEqual(ic._debug_counts["update"], 0)
191             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
192             # now send a duplicate announcement: this should not notify clients
193             ca.remote_announce([ann1])
194             return fireEventually()
195         d.addCallback(_then)
196         def _then2(ign):
197             self.failUnlessEqual(len(announcements), 1)
198             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
199             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
200             self.failUnlessEqual(ic._debug_counts["update"], 0)
201             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
202             # and a replacement announcement: same FURL, new other stuff.
203             # Clients should be notified.
204             ca.remote_announce([ann1b])
205             return fireEventually()
206         d.addCallback(_then2)
207         def _then3(ign):
208             self.failUnlessEqual(len(announcements), 2)
209             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
210             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
211             self.failUnlessEqual(ic._debug_counts["update"], 1)
212             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
213             # test that the other stuff changed
214             self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
215             self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
216         d.addCallback(_then3)
217         return d
218
219     def test_duplicate_receive_v2(self):
220         ic1 = IntroducerClient(None,
221                                "introducer.furl", u"my_nickname",
222                                "ver23", "oldest_version", {}, fakeseq)
223         # we use a second client just to create a different-looking
224         # announcement
225         ic2 = IntroducerClient(None,
226                                "introducer.furl", u"my_nickname",
227                                "ver24","oldest_version",{}, fakeseq)
228         announcements = []
229         def _received(key_s, ann):
230             announcements.append( (key_s, ann) )
231         ic1.subscribe_to("storage", _received)
232         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
233         furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
234         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
235
236         privkey_s, pubkey_vs = keyutil.make_keypair()
237         privkey, _ignored = keyutil.parse_privkey(privkey_s)
238         pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
239
240         # ann1: ic1, furl1
241         # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
242         # ann1b: ic2, furl1
243         # ann2: ic2, furl2
244
245         self.ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
246         self.ann1old = make_ann_t(ic1, furl1, privkey, seqnum=9)
247         self.ann1noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
248         self.ann1b = make_ann_t(ic2, furl1, privkey, seqnum=11)
249         self.ann1a = make_ann_t(ic1, furl1a, privkey, seqnum=12)
250         self.ann2 = make_ann_t(ic2, furl2, privkey, seqnum=13)
251
252         ic1.remote_announce_v2([self.ann1]) # queues eventual-send
253         d = fireEventually()
254         def _then1(ign):
255             self.failUnlessEqual(len(announcements), 1)
256             key_s,ann = announcements[0]
257             self.failUnlessEqual(key_s, pubkey_s)
258             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
259             self.failUnlessEqual(ann["my-version"], "ver23")
260         d.addCallback(_then1)
261
262         # now send a duplicate announcement. This should not fire the
263         # subscriber
264         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
265         d.addCallback(fireEventually)
266         def _then2(ign):
267             self.failUnlessEqual(len(announcements), 1)
268         d.addCallback(_then2)
269
270         # an older announcement shouldn't fire the subscriber either
271         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1old]))
272         d.addCallback(fireEventually)
273         def _then2a(ign):
274             self.failUnlessEqual(len(announcements), 1)
275         d.addCallback(_then2a)
276
277         # announcement with no seqnum cannot replace one with-seqnum
278         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1noseqnum]))
279         d.addCallback(fireEventually)
280         def _then2b(ign):
281             self.failUnlessEqual(len(announcements), 1)
282         d.addCallback(_then2b)
283
284         # and a replacement announcement: same FURL, new other stuff. The
285         # subscriber *should* be fired.
286         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
287         d.addCallback(fireEventually)
288         def _then3(ign):
289             self.failUnlessEqual(len(announcements), 2)
290             key_s,ann = announcements[-1]
291             self.failUnlessEqual(key_s, pubkey_s)
292             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
293             self.failUnlessEqual(ann["my-version"], "ver24")
294         d.addCallback(_then3)
295
296         # and a replacement announcement with a different FURL (it uses
297         # different connection hints)
298         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
299         d.addCallback(fireEventually)
300         def _then4(ign):
301             self.failUnlessEqual(len(announcements), 3)
302             key_s,ann = announcements[-1]
303             self.failUnlessEqual(key_s, pubkey_s)
304             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
305             self.failUnlessEqual(ann["my-version"], "ver23")
306         d.addCallback(_then4)
307
308         # now add a new subscription, which should be called with the
309         # backlog. The introducer only records one announcement per index, so
310         # the backlog will only have the latest message.
311         announcements2 = []
312         def _received2(key_s, ann):
313             announcements2.append( (key_s, ann) )
314         d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
315         d.addCallback(fireEventually)
316         def _then5(ign):
317             self.failUnlessEqual(len(announcements2), 1)
318             key_s,ann = announcements2[-1]
319             self.failUnlessEqual(key_s, pubkey_s)
320             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
321             self.failUnlessEqual(ann["my-version"], "ver23")
322         d.addCallback(_then5)
323         return d
324
325     def test_id_collision(self):
326         # test replacement case where tubid equals a keyid (one should
327         # not replace the other)
328         ic = IntroducerClient(None,
329                               "introducer.furl", u"my_nickname",
330                               "my_version", "oldest_version", {}, fakeseq)
331         announcements = []
332         ic.subscribe_to("storage",
333                         lambda key_s,ann: announcements.append(ann))
334         sk_s, vk_s = keyutil.make_keypair()
335         sk, _ignored = keyutil.parse_privkey(sk_s)
336         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
337         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
338         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
339         ann_t = make_ann_t(ic, furl1, sk, 1)
340         ic.remote_announce_v2([ann_t])
341         d = fireEventually()
342         def _then(ign):
343             # first announcement has been processed
344             self.failUnlessEqual(len(announcements), 1)
345             self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
346                                  furl1)
347             # now submit a second one, with a tubid that happens to look just
348             # like the pubkey-based serverid we just processed. They should
349             # not overlap.
350             ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
351             ca = WrapV2ClientInV1Interface(ic)
352             ca.remote_announce([ann2])
353             return fireEventually()
354         d.addCallback(_then)
355         def _then2(ign):
356             # if they overlapped, the second announcement would be ignored
357             self.failUnlessEqual(len(announcements), 2)
358             self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
359                                  furl2)
360         d.addCallback(_then2)
361         return d
362
363 class Server(unittest.TestCase):
364     def test_duplicate(self):
365         i = IntroducerService()
366         ic1 = IntroducerClient(None,
367                                "introducer.furl", u"my_nickname",
368                                "ver23", "oldest_version", {}, realseq)
369         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
370
371         privkey_s, _ = keyutil.make_keypair()
372         privkey, _ = keyutil.parse_privkey(privkey_s)
373
374         ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
375         ann1_old = make_ann_t(ic1, furl1, privkey, seqnum=9)
376         ann1_new = make_ann_t(ic1, furl1, privkey, seqnum=11)
377         ann1_noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
378         ann1_badseqnum = make_ann_t(ic1, furl1, privkey, seqnum="not an int")
379
380         i.remote_publish_v2(ann1, None)
381         all = i.get_announcements()
382         self.failUnlessEqual(len(all), 1)
383         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
384         self.failUnlessEqual(i._debug_counts["inbound_message"], 1)
385         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 0)
386         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
387         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
388         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
389
390         i.remote_publish_v2(ann1, None)
391         all = i.get_announcements()
392         self.failUnlessEqual(len(all), 1)
393         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
394         self.failUnlessEqual(i._debug_counts["inbound_message"], 2)
395         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
396         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
397         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
398         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
399
400         i.remote_publish_v2(ann1_old, None)
401         all = i.get_announcements()
402         self.failUnlessEqual(len(all), 1)
403         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
404         self.failUnlessEqual(i._debug_counts["inbound_message"], 3)
405         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
406         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
407         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
408         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
409
410         i.remote_publish_v2(ann1_new, None)
411         all = i.get_announcements()
412         self.failUnlessEqual(len(all), 1)
413         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
414         self.failUnlessEqual(i._debug_counts["inbound_message"], 4)
415         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
416         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
417         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
418         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
419
420         i.remote_publish_v2(ann1_noseqnum, None)
421         all = i.get_announcements()
422         self.failUnlessEqual(len(all), 1)
423         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
424         self.failUnlessEqual(i._debug_counts["inbound_message"], 5)
425         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
426         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 1)
427         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
428         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
429
430         i.remote_publish_v2(ann1_badseqnum, None)
431         all = i.get_announcements()
432         self.failUnlessEqual(len(all), 1)
433         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
434         self.failUnlessEqual(i._debug_counts["inbound_message"], 6)
435         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
436         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 2)
437         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
438         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
439
440
441 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
442
443 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
444
445     def create_tub(self, portnum=0):
446         tubfile = os.path.join(self.basedir, "tub.pem")
447         self.central_tub = tub = Tub(certFile=tubfile)
448         #tub.setOption("logLocalFailures", True)
449         #tub.setOption("logRemoteFailures", True)
450         tub.setOption("expose-remote-exception-types", False)
451         tub.setServiceParent(self.parent)
452         l = tub.listenOn("tcp:%d" % portnum)
453         self.central_portnum = l.getPortnum()
454         if portnum != 0:
455             assert self.central_portnum == portnum
456         tub.setLocation("localhost:%d" % self.central_portnum)
457
458 class Queue(SystemTestMixin, unittest.TestCase):
459     def test_queue_until_connected(self):
460         self.basedir = "introducer/QueueUntilConnected/queued"
461         os.makedirs(self.basedir)
462         self.create_tub()
463         introducer = IntroducerService()
464         introducer.setServiceParent(self.parent)
465         iff = os.path.join(self.basedir, "introducer.furl")
466         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
467         tub2 = Tub()
468         tub2.setServiceParent(self.parent)
469         c = IntroducerClient(tub2, ifurl,
470                              u"nickname", "version", "oldest", {}, fakeseq)
471         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
472         sk_s, vk_s = keyutil.make_keypair()
473         sk, _ignored = keyutil.parse_privkey(sk_s)
474
475         d = introducer.disownServiceParent()
476         def _offline(ign):
477             # now that the introducer server is offline, create a client and
478             # publish some messages
479             c.setServiceParent(self.parent) # this starts the reconnector
480             c.publish("storage", make_ann(furl1), sk)
481
482             introducer.setServiceParent(self.parent) # restart the server
483             # now wait for the messages to be delivered
484             def _got_announcement():
485                 return bool(introducer.get_announcements())
486             return self.poll(_got_announcement)
487         d.addCallback(_offline)
488         def _done(ign):
489             v = introducer.get_announcements()[0]
490             furl = v.announcement["anonymous-storage-FURL"]
491             self.failUnlessEqual(furl, furl1)
492         d.addCallback(_done)
493
494         # now let the ack get back
495         def _wait_until_idle(ign):
496             def _idle():
497                 if c._debug_outstanding:
498                     return False
499                 if introducer._debug_outstanding:
500                     return False
501                 return True
502             return self.poll(_idle)
503         d.addCallback(_wait_until_idle)
504         return d
505
506
507 V1 = "v1"; V2 = "v2"
508 class SystemTest(SystemTestMixin, unittest.TestCase):
509
510     def do_system_test(self, server_version):
511         self.create_tub()
512         if server_version == V1:
513             introducer = old.IntroducerService_v1()
514         else:
515             introducer = IntroducerService()
516         introducer.setServiceParent(self.parent)
517         iff = os.path.join(self.basedir, "introducer.furl")
518         tub = self.central_tub
519         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
520         self.introducer_furl = ifurl
521
522         # we have 5 clients who publish themselves as storage servers, and a
523         # sixth which does which not. All 6 clients subscriber to hear about
524         # storage. When the connections are fully established, all six nodes
525         # should have 5 connections each.
526         NUM_STORAGE = 5
527         NUM_CLIENTS = 6
528
529         clients = []
530         tubs = {}
531         received_announcements = {}
532         subscribing_clients = []
533         publishing_clients = []
534         printable_serverids = {}
535         self.the_introducer = introducer
536         privkeys = {}
537         expected_announcements = [0 for c in range(NUM_CLIENTS)]
538
539         for i in range(NUM_CLIENTS):
540             tub = Tub()
541             #tub.setOption("logLocalFailures", True)
542             #tub.setOption("logRemoteFailures", True)
543             tub.setOption("expose-remote-exception-types", False)
544             tub.setServiceParent(self.parent)
545             l = tub.listenOn("tcp:0")
546             portnum = l.getPortnum()
547             tub.setLocation("localhost:%d" % portnum)
548
549             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
550             if i == 0:
551                 c = old.IntroducerClient_v1(tub, self.introducer_furl,
552                                             NICKNAME % str(i),
553                                             "version", "oldest")
554             else:
555                 c = IntroducerClient(tub, self.introducer_furl,
556                                      NICKNAME % str(i),
557                                      "version", "oldest",
558                                      {"component": "component-v1"}, fakeseq)
559             received_announcements[c] = {}
560             def got(key_s_or_tubid, ann, announcements, i):
561                 if i == 0:
562                     index = get_tubid_string_from_ann(ann)
563                 else:
564                     index = key_s_or_tubid or get_tubid_string_from_ann(ann)
565                 announcements[index] = ann
566             c.subscribe_to("storage", got, received_announcements[c], i)
567             subscribing_clients.append(c)
568             expected_announcements[i] += 1 # all expect a 'storage' announcement
569
570             node_furl = tub.registerReference(Referenceable())
571             if i < NUM_STORAGE:
572                 if i == 0:
573                     c.publish(node_furl, "storage", "ri_name")
574                     printable_serverids[i] = get_tubid_string(node_furl)
575                 elif i == 1:
576                     # sign the announcement
577                     privkey_s, pubkey_s = keyutil.make_keypair()
578                     privkey, _ignored = keyutil.parse_privkey(privkey_s)
579                     privkeys[c] = privkey
580                     c.publish("storage", make_ann(node_furl), privkey)
581                     if server_version == V1:
582                         printable_serverids[i] = get_tubid_string(node_furl)
583                     else:
584                         assert pubkey_s.startswith("pub-")
585                         printable_serverids[i] = pubkey_s[len("pub-"):]
586                 else:
587                     c.publish("storage", make_ann(node_furl))
588                     printable_serverids[i] = get_tubid_string(node_furl)
589                 publishing_clients.append(c)
590             else:
591                 # the last one does not publish anything
592                 pass
593
594             if i == 0:
595                 # users of the V1 client were required to publish a
596                 # 'stub_client' record (somewhat after they published the
597                 # 'storage' record), so the introducer could see their
598                 # version. Match that behavior.
599                 c.publish(node_furl, "stub_client", "stub_ri_name")
600
601             if i == 2:
602                 # also publish something that nobody cares about
603                 boring_furl = tub.registerReference(Referenceable())
604                 c.publish("boring", make_ann(boring_furl))
605
606             c.setServiceParent(self.parent)
607             clients.append(c)
608             tubs[c] = tub
609
610
611         def _wait_for_connected(ign):
612             def _connected():
613                 for c in clients:
614                     if not c.connected_to_introducer():
615                         return False
616                 return True
617             return self.poll(_connected)
618
619         # we watch the clients to determine when the system has settled down.
620         # Then we can look inside the server to assert things about its
621         # state.
622
623         def _wait_for_expected_announcements(ign):
624             def _got_expected_announcements():
625                 for i,c in enumerate(subscribing_clients):
626                     if len(received_announcements[c]) < expected_announcements[i]:
627                         return False
628                 return True
629             return self.poll(_got_expected_announcements)
630
631         # before shutting down any Tub, we'd like to know that there are no
632         # messages outstanding
633
634         def _wait_until_idle(ign):
635             def _idle():
636                 for c in subscribing_clients + publishing_clients:
637                     if c._debug_outstanding:
638                         return False
639                 if self.the_introducer._debug_outstanding:
640                     return False
641                 return True
642             return self.poll(_idle)
643
644         d = defer.succeed(None)
645         d.addCallback(_wait_for_connected)
646         d.addCallback(_wait_for_expected_announcements)
647         d.addCallback(_wait_until_idle)
648
649         def _check1(res):
650             log.msg("doing _check1")
651             dc = self.the_introducer._debug_counts
652             if server_version == V1:
653                 # each storage server publishes a record, and (after its
654                 # 'subscribe' has been ACKed) also publishes a "stub_client".
655                 # The non-storage client (which subscribes) also publishes a
656                 # stub_client. There is also one "boring" service. The number
657                 # of messages is higher, because the stub_clients aren't
658                 # published until after we get the 'subscribe' ack (since we
659                 # don't realize that we're dealing with a v1 server [which
660                 # needs stub_clients] until then), and the act of publishing
661                 # the stub_client causes us to re-send all previous
662                 # announcements.
663                 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
664                                      NUM_STORAGE + NUM_CLIENTS + 1)
665             else:
666                 # each storage server publishes a record. There is also one
667                 # "stub_client" and one "boring"
668                 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
669                 self.failUnlessEqual(dc["inbound_duplicate"], 0)
670             self.failUnlessEqual(dc["inbound_update"], 0)
671             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
672             # the number of outbound messages is tricky.. I think it depends
673             # upon a race between the publish and the subscribe messages.
674             self.failUnless(dc["outbound_message"] > 0)
675             # each client subscribes to "storage", and each server publishes
676             self.failUnlessEqual(dc["outbound_announcements"],
677                                  NUM_STORAGE*NUM_CLIENTS)
678
679             for c in subscribing_clients:
680                 cdc = c._debug_counts
681                 self.failUnless(cdc["inbound_message"])
682                 self.failUnlessEqual(cdc["inbound_announcement"],
683                                      NUM_STORAGE)
684                 self.failUnlessEqual(cdc["wrong_service"], 0)
685                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
686                 self.failUnlessEqual(cdc["update"], 0)
687                 self.failUnlessEqual(cdc["new_announcement"],
688                                      NUM_STORAGE)
689                 anns = received_announcements[c]
690                 self.failUnlessEqual(len(anns), NUM_STORAGE)
691
692                 nodeid0 = tubs[clients[0]].tubID
693                 ann = anns[nodeid0]
694                 nick = ann["nickname"]
695                 self.failUnlessEqual(type(nick), unicode)
696                 self.failUnlessEqual(nick, NICKNAME % "0")
697             if server_version == V1:
698                 for c in publishing_clients:
699                     cdc = c._debug_counts
700                     expected = 1 # storage
701                     if c is clients[2]:
702                         expected += 1 # boring
703                     if c is not clients[0]:
704                         # the v2 client tries to call publish_v2, which fails
705                         # because the server is v1. It then re-sends
706                         # everything it has so far, plus a stub_client record
707                         expected = 2*expected + 1
708                     if c is clients[0]:
709                         # we always tell v1 client to send stub_client
710                         expected += 1
711                     self.failUnlessEqual(cdc["outbound_message"], expected)
712             else:
713                 for c in publishing_clients:
714                     cdc = c._debug_counts
715                     expected = 1
716                     if c in [clients[0], # stub_client
717                              clients[2], # boring
718                              ]:
719                         expected = 2
720                     self.failUnlessEqual(cdc["outbound_message"], expected)
721             # now check the web status, make sure it renders without error
722             ir = introweb.IntroducerRoot(self.parent)
723             self.parent.nodeid = "NODEID"
724             text = ir.renderSynchronously().decode("utf-8")
725             self.failUnlessIn(NICKNAME % "0", text) # the v1 client
726             self.failUnlessIn(NICKNAME % "1", text) # a v2 client
727             for i in range(NUM_STORAGE):
728                 self.failUnlessIn(printable_serverids[i], text,
729                                   (i,printable_serverids[i],text))
730                 # make sure there isn't a double-base32ed string too
731                 self.failIfIn(idlib.nodeid_b2a(printable_serverids[i]), text,
732                               (i,printable_serverids[i],text))
733             log.msg("_check1 done")
734         d.addCallback(_check1)
735
736         # force an introducer reconnect, by shutting down the Tub it's using
737         # and starting a new Tub (with the old introducer). Everybody should
738         # reconnect and republish, but the introducer should ignore the
739         # republishes as duplicates. However, because the server doesn't know
740         # what each client does and does not know, it will send them a copy
741         # of the current announcement table anyway.
742
743         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
744         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
745
746         def _wait_for_introducer_loss(ign):
747             def _introducer_lost():
748                 for c in clients:
749                     if c.connected_to_introducer():
750                         return False
751                 return True
752             return self.poll(_introducer_lost)
753         d.addCallback(_wait_for_introducer_loss)
754
755         def _restart_introducer_tub(_ign):
756             log.msg("restarting introducer's Tub")
757             # reset counters
758             for i in range(NUM_CLIENTS):
759                 c = subscribing_clients[i]
760                 for k in c._debug_counts:
761                     c._debug_counts[k] = 0
762             for k in self.the_introducer._debug_counts:
763                 self.the_introducer._debug_counts[k] = 0
764             expected_announcements[i] += 1 # new 'storage' for everyone
765             self.create_tub(self.central_portnum)
766             newfurl = self.central_tub.registerReference(self.the_introducer,
767                                                          furlFile=iff)
768             assert newfurl == self.introducer_furl
769         d.addCallback(_restart_introducer_tub)
770
771         d.addCallback(_wait_for_connected)
772         d.addCallback(_wait_for_expected_announcements)
773         d.addCallback(_wait_until_idle)
774         d.addCallback(lambda _ign: log.msg(" reconnected"))
775
776         # TODO: publish something while the introducer is offline, then
777         # confirm it gets delivered when the connection is reestablished
778         def _check2(res):
779             log.msg("doing _check2")
780             # assert that the introducer sent out new messages, one per
781             # subscriber
782             dc = self.the_introducer._debug_counts
783             self.failUnlessEqual(dc["outbound_announcements"],
784                                  NUM_STORAGE*NUM_CLIENTS)
785             self.failUnless(dc["outbound_message"] > 0)
786             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
787             for c in subscribing_clients:
788                 cdc = c._debug_counts
789                 self.failUnlessEqual(cdc["inbound_message"], 1)
790                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
791                 self.failUnlessEqual(cdc["new_announcement"], 0)
792                 self.failUnlessEqual(cdc["wrong_service"], 0)
793                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
794         d.addCallback(_check2)
795
796         # Then force an introducer restart, by shutting down the Tub,
797         # destroying the old introducer, and starting a new Tub+Introducer.
798         # Everybody should reconnect and republish, and the (new) introducer
799         # will distribute the new announcements, but the clients should
800         # ignore the republishes as duplicates.
801
802         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
803         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
804         d.addCallback(_wait_for_introducer_loss)
805         d.addCallback(lambda _ign: log.msg("introducer lost"))
806
807         def _restart_introducer(_ign):
808             log.msg("restarting introducer")
809             self.create_tub(self.central_portnum)
810             # reset counters
811             for i in range(NUM_CLIENTS):
812                 c = subscribing_clients[i]
813                 for k in c._debug_counts:
814                     c._debug_counts[k] = 0
815             expected_announcements[i] += 1 # new 'storage' for everyone
816             if server_version == V1:
817                 introducer = old.IntroducerService_v1()
818             else:
819                 introducer = IntroducerService()
820             self.the_introducer = introducer
821             newfurl = self.central_tub.registerReference(self.the_introducer,
822                                                          furlFile=iff)
823             assert newfurl == self.introducer_furl
824         d.addCallback(_restart_introducer)
825
826         d.addCallback(_wait_for_connected)
827         d.addCallback(_wait_for_expected_announcements)
828         d.addCallback(_wait_until_idle)
829
830         def _check3(res):
831             log.msg("doing _check3")
832             dc = self.the_introducer._debug_counts
833             self.failUnlessEqual(dc["outbound_announcements"],
834                                  NUM_STORAGE*NUM_CLIENTS)
835             self.failUnless(dc["outbound_message"] > 0)
836             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
837             for c in subscribing_clients:
838                 cdc = c._debug_counts
839                 self.failUnless(cdc["inbound_message"] > 0)
840                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
841                 self.failUnlessEqual(cdc["new_announcement"], 0)
842                 self.failUnlessEqual(cdc["wrong_service"], 0)
843                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
844
845         d.addCallback(_check3)
846         return d
847
848
849     def test_system_v2_server(self):
850         self.basedir = "introducer/SystemTest/system_v2_server"
851         os.makedirs(self.basedir)
852         return self.do_system_test(V2)
853     test_system_v2_server.timeout = 480
854     # occasionally takes longer than 350s on "draco"
855
856     def test_system_v1_server(self):
857         self.basedir = "introducer/SystemTest/system_v1_server"
858         os.makedirs(self.basedir)
859         return self.do_system_test(V1)
860     test_system_v1_server.timeout = 480
861     # occasionally takes longer than 350s on "draco"
862
863 class FakeRemoteReference:
864     def notifyOnDisconnect(self, *args, **kwargs): pass
865     def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
866     def getLocationHints(self): return [("ipv4", "here.example.com", "1234"),
867                                         ("ipv4", "there.example.com", "2345")]
868     def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
869                                                   3456)
870
871 class ClientInfo(unittest.TestCase):
872     def test_client_v2(self):
873         introducer = IntroducerService()
874         tub = introducer_furl = None
875         app_versions = {"whizzy": "fizzy"}
876         client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
877                                      "my_version", "oldest", app_versions,
878                                      fakeseq)
879         #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
880         #ann_s = make_ann_t(client_v2, furl1, None, 10)
881         #introducer.remote_publish_v2(ann_s, Referenceable())
882         subscriber = FakeRemoteReference()
883         introducer.remote_subscribe_v2(subscriber, "storage",
884                                        client_v2._my_subscriber_info)
885         subs = introducer.get_subscribers()
886         self.failUnlessEqual(len(subs), 1)
887         s0 = subs[0]
888         self.failUnlessEqual(s0.service_name, "storage")
889         self.failUnlessEqual(s0.app_versions, app_versions)
890         self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
891         self.failUnlessEqual(s0.version, "my_version")
892
893     def test_client_v1(self):
894         introducer = IntroducerService()
895         subscriber = FakeRemoteReference()
896         introducer.remote_subscribe(subscriber, "storage")
897         # the v1 subscribe interface had no subscriber_info: that was usually
898         # sent in a separate stub_client pseudo-announcement
899         subs = introducer.get_subscribers()
900         self.failUnlessEqual(len(subs), 1)
901         s0 = subs[0]
902         self.failUnlessEqual(s0.nickname, u"?") # not known yet
903         self.failUnlessEqual(s0.service_name, "storage")
904
905         # now submit the stub_client announcement
906         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
907         ann = (furl1, "stub_client", "RIStubClient",
908                (NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
909         introducer.remote_publish(ann)
910         # the server should correlate the two
911         subs = introducer.get_subscribers()
912         self.failUnlessEqual(len(subs), 1)
913         s0 = subs[0]
914         self.failUnlessEqual(s0.service_name, "storage")
915         # v1 announcements do not contain app-versions
916         self.failUnlessEqual(s0.app_versions, {})
917         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
918         self.failUnlessEqual(s0.version, "my_version")
919
920         # a subscription that arrives after the stub_client announcement
921         # should be correlated too
922         subscriber2 = FakeRemoteReference()
923         introducer.remote_subscribe(subscriber2, "thing2")
924
925         subs = introducer.get_subscribers()
926         self.failUnlessEqual(len(subs), 2)
927         s0 = [s for s in subs if s.service_name == "thing2"][0]
928         # v1 announcements do not contain app-versions
929         self.failUnlessEqual(s0.app_versions, {})
930         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
931         self.failUnlessEqual(s0.version, "my_version")
932
933 class Announcements(unittest.TestCase):
934     def test_client_v2_unsigned(self):
935         introducer = IntroducerService()
936         tub = introducer_furl = None
937         app_versions = {"whizzy": "fizzy"}
938         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
939                                      "my_version", "oldest", app_versions,
940                                      fakeseq)
941         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
942         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
943         ann_s0 = make_ann_t(client_v2, furl1, None, 10)
944         canary0 = Referenceable()
945         introducer.remote_publish_v2(ann_s0, canary0)
946         a = introducer.get_announcements()
947         self.failUnlessEqual(len(a), 1)
948         self.failUnlessIdentical(a[0].canary, canary0)
949         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
950         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
951         self.failUnlessEqual(a[0].nickname, u"nick-v2")
952         self.failUnlessEqual(a[0].service_name, "storage")
953         self.failUnlessEqual(a[0].version, "my_version")
954         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
955
956     def test_client_v2_signed(self):
957         introducer = IntroducerService()
958         tub = introducer_furl = None
959         app_versions = {"whizzy": "fizzy"}
960         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
961                                      "my_version", "oldest", app_versions,
962                                      fakeseq)
963         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
964         sk_s, vk_s = keyutil.make_keypair()
965         sk, _ignored = keyutil.parse_privkey(sk_s)
966         pks = keyutil.remove_prefix(vk_s, "pub-")
967         ann_t0 = make_ann_t(client_v2, furl1, sk, 10)
968         canary0 = Referenceable()
969         introducer.remote_publish_v2(ann_t0, canary0)
970         a = introducer.get_announcements()
971         self.failUnlessEqual(len(a), 1)
972         self.failUnlessIdentical(a[0].canary, canary0)
973         self.failUnlessEqual(a[0].index, ("storage", pks, None))
974         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
975         self.failUnlessEqual(a[0].nickname, u"nick-v2")
976         self.failUnlessEqual(a[0].service_name, "storage")
977         self.failUnlessEqual(a[0].version, "my_version")
978         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
979
980     def test_client_v1(self):
981         introducer = IntroducerService()
982
983         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
984         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
985         ann = (furl1, "storage", "RIStorage",
986                u"nick-v1".encode("utf-8"), "my_version", "oldest")
987         introducer.remote_publish(ann)
988
989         a = introducer.get_announcements()
990         self.failUnlessEqual(len(a), 1)
991         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
992         self.failUnlessEqual(a[0].canary, None)
993         self.failUnlessEqual(a[0].announcement["app-versions"], {})
994         self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
995         self.failUnlessEqual(a[0].service_name, "storage")
996         self.failUnlessEqual(a[0].version, "my_version")
997         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
998
999 class ClientSeqnums(unittest.TestCase):
1000     def test_client(self):
1001         basedir = "introducer/ClientSeqnums/test_client"
1002         fileutil.make_dirs(basedir)
1003         f = open(os.path.join(basedir, "tahoe.cfg"), "w")
1004         f.write("[client]\n")
1005         f.write("introducer.furl = nope\n")
1006         f.close()
1007         c = TahoeClient(basedir)
1008         ic = c.introducer_client
1009         outbound = ic._outbound_announcements
1010         published = ic._published_announcements
1011         def read_seqnum():
1012             f = open(os.path.join(basedir, "announcement-seqnum"))
1013             seqnum = f.read().strip()
1014             f.close()
1015             return int(seqnum)
1016
1017         ic.publish("sA", {"key": "value1"}, c._server_key)
1018         self.failUnlessEqual(read_seqnum(), 1)
1019         self.failUnless("sA" in outbound)
1020         self.failUnlessEqual(outbound["sA"]["seqnum"], 1)
1021         nonce1 = outbound["sA"]["nonce"]
1022         self.failUnless(isinstance(nonce1, str))
1023         self.failUnlessEqual(simplejson.loads(published["sA"][0]),
1024                              outbound["sA"])
1025         # [1] is the signature, [2] is the pubkey
1026
1027         # publishing a second service causes both services to be
1028         # re-published, with the next higher sequence number
1029         ic.publish("sB", {"key": "value2"}, c._server_key)
1030         self.failUnlessEqual(read_seqnum(), 2)
1031         self.failUnless("sB" in outbound)
1032         self.failUnlessEqual(outbound["sB"]["seqnum"], 2)
1033         self.failUnless("sA" in outbound)
1034         self.failUnlessEqual(outbound["sA"]["seqnum"], 2)
1035         nonce2 = outbound["sA"]["nonce"]
1036         self.failUnless(isinstance(nonce2, str))
1037         self.failIfEqual(nonce1, nonce2)
1038         self.failUnlessEqual(simplejson.loads(published["sA"][0]),
1039                              outbound["sA"])
1040         self.failUnlessEqual(simplejson.loads(published["sB"][0]),
1041                              outbound["sB"])
1042
1043
1044
1045 class TooNewServer(IntroducerService):
1046     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
1047                  { },
1048                 "application-version": "greetings from the crazy future",
1049                 }
1050
1051 class NonV1Server(SystemTestMixin, unittest.TestCase):
1052     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
1053     # protocol, it is supposed to provide a useful error instead of a weird
1054     # exception.
1055
1056     def test_failure(self):
1057         self.basedir = "introducer/NonV1Server/failure"
1058         os.makedirs(self.basedir)
1059         self.create_tub()
1060         i = TooNewServer()
1061         i.setServiceParent(self.parent)
1062         self.introducer_furl = self.central_tub.registerReference(i)
1063
1064         tub = Tub()
1065         tub.setOption("expose-remote-exception-types", False)
1066         tub.setServiceParent(self.parent)
1067         l = tub.listenOn("tcp:0")
1068         portnum = l.getPortnum()
1069         tub.setLocation("localhost:%d" % portnum)
1070
1071         c = IntroducerClient(tub, self.introducer_furl,
1072                              u"nickname-client", "version", "oldest", {},
1073                              fakeseq)
1074         announcements = {}
1075         def got(key_s, ann):
1076             announcements[key_s] = ann
1077         c.subscribe_to("storage", got)
1078
1079         c.setServiceParent(self.parent)
1080
1081         # now we wait for it to connect and notice the bad version
1082
1083         def _got_bad():
1084             return bool(c._introducer_error) or bool(c._publisher)
1085         d = self.poll(_got_bad)
1086         def _done(res):
1087             self.failUnless(c._introducer_error)
1088             self.failUnless(c._introducer_error.check(InsufficientVersionError),
1089                             c._introducer_error)
1090         d.addCallback(_done)
1091         return d
1092
1093 class DecodeFurl(unittest.TestCase):
1094     def test_decode(self):
1095         # make sure we have a working base64.b32decode. The one in
1096         # python2.4.[01] was broken.
1097         furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
1098         m = re.match(r'pb://(\w+)@', furl)
1099         assert m
1100         nodeid = b32decode(m.group(1).upper())
1101         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
1102
1103 class Signatures(unittest.TestCase):
1104     def test_sign(self):
1105         ann = {"key1": "value1"}
1106         sk_s,vk_s = keyutil.make_keypair()
1107         sk,ignored = keyutil.parse_privkey(sk_s)
1108         ann_t = sign_to_foolscap(ann, sk)
1109         (msg, sig, key) = ann_t
1110         self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
1111         self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
1112         self.failUnless(sig.startswith("v0-"))
1113         self.failUnless(key.startswith("v0-"))
1114         (ann2,key2) = unsign_from_foolscap(ann_t)
1115         self.failUnlessEqual(ann2, ann)
1116         self.failUnlessEqual("pub-"+key2, vk_s)
1117
1118         # bad signature
1119         bad_ann = {"key1": "value2"}
1120         bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
1121         self.failUnlessRaises(keyutil.BadSignatureError,
1122                               unsign_from_foolscap, (bad_msg,sig,key))
1123         # sneaky bad signature should be ignored
1124         (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
1125         self.failUnlessEqual(key2, None)
1126         self.failUnlessEqual(ann2, bad_ann)
1127
1128         # unrecognized signatures
1129         self.failUnlessRaises(UnknownKeyError,
1130                               unsign_from_foolscap, (bad_msg,"v999-sig",key))
1131         self.failUnlessRaises(UnknownKeyError,
1132                               unsign_from_foolscap, (bad_msg,sig,"v999-key"))
1133
1134
1135 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
1136 # should leave the Reconnector in place, also if it receives
1137 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
1138 # should tear down the Reconnector and make a new one. This behavior used to
1139 # live in the IntroducerClient, and thus used to be tested by test_introducer
1140
1141 # copying more tests from old branch:
1142
1143 #  then also add Upgrade test