]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
introweb: fix connection hints for server announcements
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 import os, re, itertools
3 from base64 import b32decode
4 import simplejson
5
6 from twisted.trial import unittest
7 from twisted.internet import defer, address
8 from twisted.python import log
9
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14      WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService, FurlFileConflictError
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17      get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
18      UnknownKeyError
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.web import introweb
23 from allmydata.client import Client as TahoeClient
24 from allmydata.util import pollmixin, keyutil, idlib, fileutil
25 import allmydata.test.common_util as testutil
26
27 class LoggingMultiService(service.MultiService):
28     def log(self, msg, **kw):
29         log.msg(msg, **kw)
30
31 class Node(testutil.SignalMixin, testutil.ReallyEqualMixin, unittest.TestCase):
32     def test_furl(self):
33         basedir = "introducer.IntroducerNode.test_furl"
34         os.mkdir(basedir)
35         public_fn = os.path.join(basedir, "introducer.furl")
36         private_fn = os.path.join(basedir, "private", "introducer.furl")
37         q1 = IntroducerNode(basedir)
38         d = fireEventually(None)
39         d.addCallback(lambda res: q1.startService())
40         d.addCallback(lambda res: q1.when_tub_ready())
41         d.addCallback(lambda res: q1.stopService())
42         d.addCallback(flushEventualQueue)
43         def _check_furl(res):
44             # new nodes create unguessable furls in private/introducer.furl
45             ifurl = fileutil.read(private_fn)
46             self.failUnless(ifurl)
47             ifurl = ifurl.strip()
48             self.failIf(ifurl.endswith("/introducer"), ifurl)
49
50             # old nodes created guessable furls in BASEDIR/introducer.furl
51             guessable = ifurl[:ifurl.rfind("/")] + "/introducer"
52             fileutil.write(public_fn, guessable+"\n", mode="w") # text
53
54             # if we see both files, throw an error
55             self.failUnlessRaises(FurlFileConflictError,
56                                   IntroducerNode, basedir)
57
58             # when we see only the public one, move it to private/ and use
59             # the existing furl instead of creating a new one
60             os.unlink(private_fn)
61             q2 = IntroducerNode(basedir)
62             d2 = fireEventually(None)
63             d2.addCallback(lambda res: q2.startService())
64             d2.addCallback(lambda res: q2.when_tub_ready())
65             d2.addCallback(lambda res: q2.stopService())
66             d2.addCallback(flushEventualQueue)
67             def _check_furl2(res):
68                 self.failIf(os.path.exists(public_fn))
69                 ifurl2 = fileutil.read(private_fn)
70                 self.failUnless(ifurl2)
71                 self.failUnlessEqual(ifurl2.strip(), guessable)
72             d2.addCallback(_check_furl2)
73             return d2
74         d.addCallback(_check_furl)
75         return d
76
77     def test_web_static(self):
78         basedir = u"introducer.Node.test_web_static"
79         os.mkdir(basedir)
80         fileutil.write(os.path.join(basedir, "tahoe.cfg"),
81                        "[node]\n" +
82                        "web.port = tcp:0:interface=127.0.0.1\n" +
83                        "web.static = relative\n")
84         c = IntroducerNode(basedir)
85         w = c.getServiceNamed("webish")
86         abs_basedir = fileutil.abspath_expanduser_unicode(basedir)
87         expected = fileutil.abspath_expanduser_unicode(u"relative", abs_basedir)
88         self.failUnlessReallyEqual(w.staticdir, expected)
89
90
91 class ServiceMixin:
92     def setUp(self):
93         self.parent = LoggingMultiService()
94         self.parent.startService()
95     def tearDown(self):
96         log.msg("TestIntroducer.tearDown")
97         d = defer.succeed(None)
98         d.addCallback(lambda res: self.parent.stopService())
99         d.addCallback(flushEventualQueue)
100         return d
101
102 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
103
104     def test_create(self):
105         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
106                               "my_version", "oldest_version", {}, fakeseq)
107         self.failUnless(isinstance(ic, IntroducerClient))
108
109     def test_listen(self):
110         i = IntroducerService()
111         i.setServiceParent(self.parent)
112
113     def test_duplicate_publish(self):
114         i = IntroducerService()
115         self.failUnlessEqual(len(i.get_announcements()), 0)
116         self.failUnlessEqual(len(i.get_subscribers()), 0)
117         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
118         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
119         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
120         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
121         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
122         i.remote_publish(ann1)
123         self.failUnlessEqual(len(i.get_announcements()), 1)
124         self.failUnlessEqual(len(i.get_subscribers()), 0)
125         i.remote_publish(ann2)
126         self.failUnlessEqual(len(i.get_announcements()), 2)
127         self.failUnlessEqual(len(i.get_subscribers()), 0)
128         i.remote_publish(ann1b)
129         self.failUnlessEqual(len(i.get_announcements()), 2)
130         self.failUnlessEqual(len(i.get_subscribers()), 0)
131
132     def test_id_collision(self):
133         # test replacement case where tubid equals a keyid (one should
134         # not replace the other)
135         i = IntroducerService()
136         ic = IntroducerClient(None,
137                               "introducer.furl", u"my_nickname",
138                               "my_version", "oldest_version", {}, fakeseq)
139         sk_s, vk_s = keyutil.make_keypair()
140         sk, _ignored = keyutil.parse_privkey(sk_s)
141         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
142         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
143         ann_t = make_ann_t(ic, furl1, sk, 1)
144         i.remote_publish_v2(ann_t, Referenceable())
145         announcements = i.get_announcements()
146         self.failUnlessEqual(len(announcements), 1)
147         key1 = ("storage", "v0-"+keyid, None)
148         self.failUnlessEqual(announcements[0].index, key1)
149         ann1_out = announcements[0].announcement
150         self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
151
152         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
153         ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
154         i.remote_publish(ann2)
155         announcements = i.get_announcements()
156         self.failUnlessEqual(len(announcements), 2)
157         key2 = ("storage", None, keyid)
158         wanted = [ad for ad in announcements if ad.index == key2]
159         self.failUnlessEqual(len(wanted), 1)
160         ann2_out = wanted[0].announcement
161         self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
162
163
164 def fakeseq():
165     return 1, "nonce"
166
167 seqnum_counter = itertools.count(1)
168 def realseq():
169     return seqnum_counter.next(), str(os.randint(1,100000))
170
171 def make_ann(furl):
172     ann = { "anonymous-storage-FURL": furl,
173             "permutation-seed-base32": get_tubid_string(furl) }
174     return ann
175
176 def make_ann_t(ic, furl, privkey, seqnum):
177     ann_d = ic.create_announcement_dict("storage", make_ann(furl))
178     ann_d["seqnum"] = seqnum
179     ann_d["nonce"] = "nonce"
180     ann_t = sign_to_foolscap(ann_d, privkey)
181     return ann_t
182
183 class Client(unittest.TestCase):
184     def test_duplicate_receive_v1(self):
185         ic = IntroducerClient(None,
186                               "introducer.furl", u"my_nickname",
187                               "my_version", "oldest_version", {}, fakeseq)
188         announcements = []
189         ic.subscribe_to("storage",
190                         lambda key_s,ann: announcements.append(ann))
191         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
192         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
193         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
194         ca = WrapV2ClientInV1Interface(ic)
195
196         ca.remote_announce([ann1])
197         d = fireEventually()
198         def _then(ign):
199             self.failUnlessEqual(len(announcements), 1)
200             self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
201             self.failUnlessEqual(announcements[0]["my-version"], "ver23")
202             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
203             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
204             self.failUnlessEqual(ic._debug_counts["update"], 0)
205             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
206             # now send a duplicate announcement: this should not notify clients
207             ca.remote_announce([ann1])
208             return fireEventually()
209         d.addCallback(_then)
210         def _then2(ign):
211             self.failUnlessEqual(len(announcements), 1)
212             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
213             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
214             self.failUnlessEqual(ic._debug_counts["update"], 0)
215             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
216             # and a replacement announcement: same FURL, new other stuff.
217             # Clients should be notified.
218             ca.remote_announce([ann1b])
219             return fireEventually()
220         d.addCallback(_then2)
221         def _then3(ign):
222             self.failUnlessEqual(len(announcements), 2)
223             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
224             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
225             self.failUnlessEqual(ic._debug_counts["update"], 1)
226             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
227             # test that the other stuff changed
228             self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
229             self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
230         d.addCallback(_then3)
231         return d
232
233     def test_duplicate_receive_v2(self):
234         ic1 = IntroducerClient(None,
235                                "introducer.furl", u"my_nickname",
236                                "ver23", "oldest_version", {}, fakeseq)
237         # we use a second client just to create a different-looking
238         # announcement
239         ic2 = IntroducerClient(None,
240                                "introducer.furl", u"my_nickname",
241                                "ver24","oldest_version",{}, fakeseq)
242         announcements = []
243         def _received(key_s, ann):
244             announcements.append( (key_s, ann) )
245         ic1.subscribe_to("storage", _received)
246         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
247         furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
248         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
249
250         privkey_s, pubkey_vs = keyutil.make_keypair()
251         privkey, _ignored = keyutil.parse_privkey(privkey_s)
252         pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
253
254         # ann1: ic1, furl1
255         # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
256         # ann1b: ic2, furl1
257         # ann2: ic2, furl2
258
259         self.ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
260         self.ann1old = make_ann_t(ic1, furl1, privkey, seqnum=9)
261         self.ann1noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
262         self.ann1b = make_ann_t(ic2, furl1, privkey, seqnum=11)
263         self.ann1a = make_ann_t(ic1, furl1a, privkey, seqnum=12)
264         self.ann2 = make_ann_t(ic2, furl2, privkey, seqnum=13)
265
266         ic1.remote_announce_v2([self.ann1]) # queues eventual-send
267         d = fireEventually()
268         def _then1(ign):
269             self.failUnlessEqual(len(announcements), 1)
270             key_s,ann = announcements[0]
271             self.failUnlessEqual(key_s, pubkey_s)
272             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
273             self.failUnlessEqual(ann["my-version"], "ver23")
274         d.addCallback(_then1)
275
276         # now send a duplicate announcement. This should not fire the
277         # subscriber
278         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
279         d.addCallback(fireEventually)
280         def _then2(ign):
281             self.failUnlessEqual(len(announcements), 1)
282         d.addCallback(_then2)
283
284         # an older announcement shouldn't fire the subscriber either
285         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1old]))
286         d.addCallback(fireEventually)
287         def _then2a(ign):
288             self.failUnlessEqual(len(announcements), 1)
289         d.addCallback(_then2a)
290
291         # announcement with no seqnum cannot replace one with-seqnum
292         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1noseqnum]))
293         d.addCallback(fireEventually)
294         def _then2b(ign):
295             self.failUnlessEqual(len(announcements), 1)
296         d.addCallback(_then2b)
297
298         # and a replacement announcement: same FURL, new other stuff. The
299         # subscriber *should* be fired.
300         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
301         d.addCallback(fireEventually)
302         def _then3(ign):
303             self.failUnlessEqual(len(announcements), 2)
304             key_s,ann = announcements[-1]
305             self.failUnlessEqual(key_s, pubkey_s)
306             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
307             self.failUnlessEqual(ann["my-version"], "ver24")
308         d.addCallback(_then3)
309
310         # and a replacement announcement with a different FURL (it uses
311         # different connection hints)
312         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
313         d.addCallback(fireEventually)
314         def _then4(ign):
315             self.failUnlessEqual(len(announcements), 3)
316             key_s,ann = announcements[-1]
317             self.failUnlessEqual(key_s, pubkey_s)
318             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
319             self.failUnlessEqual(ann["my-version"], "ver23")
320         d.addCallback(_then4)
321
322         # now add a new subscription, which should be called with the
323         # backlog. The introducer only records one announcement per index, so
324         # the backlog will only have the latest message.
325         announcements2 = []
326         def _received2(key_s, ann):
327             announcements2.append( (key_s, ann) )
328         d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
329         d.addCallback(fireEventually)
330         def _then5(ign):
331             self.failUnlessEqual(len(announcements2), 1)
332             key_s,ann = announcements2[-1]
333             self.failUnlessEqual(key_s, pubkey_s)
334             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
335             self.failUnlessEqual(ann["my-version"], "ver23")
336         d.addCallback(_then5)
337         return d
338
339     def test_id_collision(self):
340         # test replacement case where tubid equals a keyid (one should
341         # not replace the other)
342         ic = IntroducerClient(None,
343                               "introducer.furl", u"my_nickname",
344                               "my_version", "oldest_version", {}, fakeseq)
345         announcements = []
346         ic.subscribe_to("storage",
347                         lambda key_s,ann: announcements.append(ann))
348         sk_s, vk_s = keyutil.make_keypair()
349         sk, _ignored = keyutil.parse_privkey(sk_s)
350         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
351         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
352         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
353         ann_t = make_ann_t(ic, furl1, sk, 1)
354         ic.remote_announce_v2([ann_t])
355         d = fireEventually()
356         def _then(ign):
357             # first announcement has been processed
358             self.failUnlessEqual(len(announcements), 1)
359             self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
360                                  furl1)
361             # now submit a second one, with a tubid that happens to look just
362             # like the pubkey-based serverid we just processed. They should
363             # not overlap.
364             ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
365             ca = WrapV2ClientInV1Interface(ic)
366             ca.remote_announce([ann2])
367             return fireEventually()
368         d.addCallback(_then)
369         def _then2(ign):
370             # if they overlapped, the second announcement would be ignored
371             self.failUnlessEqual(len(announcements), 2)
372             self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
373                                  furl2)
374         d.addCallback(_then2)
375         return d
376
377 class Server(unittest.TestCase):
378     def test_duplicate(self):
379         i = IntroducerService()
380         ic1 = IntroducerClient(None,
381                                "introducer.furl", u"my_nickname",
382                                "ver23", "oldest_version", {}, realseq)
383         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
384
385         privkey_s, _ = keyutil.make_keypair()
386         privkey, _ = keyutil.parse_privkey(privkey_s)
387
388         ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
389         ann1_old = make_ann_t(ic1, furl1, privkey, seqnum=9)
390         ann1_new = make_ann_t(ic1, furl1, privkey, seqnum=11)
391         ann1_noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
392         ann1_badseqnum = make_ann_t(ic1, furl1, privkey, seqnum="not an int")
393
394         i.remote_publish_v2(ann1, None)
395         all = i.get_announcements()
396         self.failUnlessEqual(len(all), 1)
397         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
398         self.failUnlessEqual(i._debug_counts["inbound_message"], 1)
399         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 0)
400         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
401         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
402         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
403
404         i.remote_publish_v2(ann1, None)
405         all = i.get_announcements()
406         self.failUnlessEqual(len(all), 1)
407         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
408         self.failUnlessEqual(i._debug_counts["inbound_message"], 2)
409         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
410         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
411         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
412         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
413
414         i.remote_publish_v2(ann1_old, None)
415         all = i.get_announcements()
416         self.failUnlessEqual(len(all), 1)
417         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
418         self.failUnlessEqual(i._debug_counts["inbound_message"], 3)
419         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
420         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
421         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
422         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
423
424         i.remote_publish_v2(ann1_new, None)
425         all = i.get_announcements()
426         self.failUnlessEqual(len(all), 1)
427         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
428         self.failUnlessEqual(i._debug_counts["inbound_message"], 4)
429         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
430         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
431         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
432         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
433
434         i.remote_publish_v2(ann1_noseqnum, None)
435         all = i.get_announcements()
436         self.failUnlessEqual(len(all), 1)
437         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
438         self.failUnlessEqual(i._debug_counts["inbound_message"], 5)
439         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
440         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 1)
441         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
442         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
443
444         i.remote_publish_v2(ann1_badseqnum, None)
445         all = i.get_announcements()
446         self.failUnlessEqual(len(all), 1)
447         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
448         self.failUnlessEqual(i._debug_counts["inbound_message"], 6)
449         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
450         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 2)
451         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
452         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
453
454
455 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
456
457 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
458
459     def create_tub(self, portnum=0):
460         tubfile = os.path.join(self.basedir, "tub.pem")
461         self.central_tub = tub = Tub(certFile=tubfile)
462         #tub.setOption("logLocalFailures", True)
463         #tub.setOption("logRemoteFailures", True)
464         tub.setOption("expose-remote-exception-types", False)
465         tub.setServiceParent(self.parent)
466         l = tub.listenOn("tcp:%d" % portnum)
467         self.central_portnum = l.getPortnum()
468         if portnum != 0:
469             assert self.central_portnum == portnum
470         tub.setLocation("localhost:%d" % self.central_portnum)
471
472 class Queue(SystemTestMixin, unittest.TestCase):
473     def test_queue_until_connected(self):
474         self.basedir = "introducer/QueueUntilConnected/queued"
475         os.makedirs(self.basedir)
476         self.create_tub()
477         introducer = IntroducerService()
478         introducer.setServiceParent(self.parent)
479         iff = os.path.join(self.basedir, "introducer.furl")
480         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
481         tub2 = Tub()
482         tub2.setServiceParent(self.parent)
483         c = IntroducerClient(tub2, ifurl,
484                              u"nickname", "version", "oldest", {}, fakeseq)
485         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
486         sk_s, vk_s = keyutil.make_keypair()
487         sk, _ignored = keyutil.parse_privkey(sk_s)
488
489         d = introducer.disownServiceParent()
490         def _offline(ign):
491             # now that the introducer server is offline, create a client and
492             # publish some messages
493             c.setServiceParent(self.parent) # this starts the reconnector
494             c.publish("storage", make_ann(furl1), sk)
495
496             introducer.setServiceParent(self.parent) # restart the server
497             # now wait for the messages to be delivered
498             def _got_announcement():
499                 return bool(introducer.get_announcements())
500             return self.poll(_got_announcement)
501         d.addCallback(_offline)
502         def _done(ign):
503             v = introducer.get_announcements()[0]
504             furl = v.announcement["anonymous-storage-FURL"]
505             self.failUnlessEqual(furl, furl1)
506         d.addCallback(_done)
507
508         # now let the ack get back
509         def _wait_until_idle(ign):
510             def _idle():
511                 if c._debug_outstanding:
512                     return False
513                 if introducer._debug_outstanding:
514                     return False
515                 return True
516             return self.poll(_idle)
517         d.addCallback(_wait_until_idle)
518         return d
519
520
521 V1 = "v1"; V2 = "v2"
522 class SystemTest(SystemTestMixin, unittest.TestCase):
523
524     def do_system_test(self, server_version):
525         self.create_tub()
526         if server_version == V1:
527             introducer = old.IntroducerService_v1()
528         else:
529             introducer = IntroducerService()
530         introducer.setServiceParent(self.parent)
531         iff = os.path.join(self.basedir, "introducer.furl")
532         tub = self.central_tub
533         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
534         self.introducer_furl = ifurl
535
536         # we have 5 clients who publish themselves as storage servers, and a
537         # sixth which does which not. All 6 clients subscriber to hear about
538         # storage. When the connections are fully established, all six nodes
539         # should have 5 connections each.
540         NUM_STORAGE = 5
541         NUM_CLIENTS = 6
542
543         clients = []
544         tubs = {}
545         received_announcements = {}
546         subscribing_clients = []
547         publishing_clients = []
548         printable_serverids = {}
549         self.the_introducer = introducer
550         privkeys = {}
551         expected_announcements = [0 for c in range(NUM_CLIENTS)]
552
553         for i in range(NUM_CLIENTS):
554             tub = Tub()
555             #tub.setOption("logLocalFailures", True)
556             #tub.setOption("logRemoteFailures", True)
557             tub.setOption("expose-remote-exception-types", False)
558             tub.setServiceParent(self.parent)
559             l = tub.listenOn("tcp:0")
560             portnum = l.getPortnum()
561             tub.setLocation("localhost:%d" % portnum)
562
563             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
564             if i == 0:
565                 c = old.IntroducerClient_v1(tub, self.introducer_furl,
566                                             NICKNAME % str(i),
567                                             "version", "oldest")
568             else:
569                 c = IntroducerClient(tub, self.introducer_furl,
570                                      NICKNAME % str(i),
571                                      "version", "oldest",
572                                      {"component": "component-v1"}, fakeseq)
573             received_announcements[c] = {}
574             def got(key_s_or_tubid, ann, announcements, i):
575                 if i == 0:
576                     index = get_tubid_string_from_ann(ann)
577                 else:
578                     index = key_s_or_tubid or get_tubid_string_from_ann(ann)
579                 announcements[index] = ann
580             c.subscribe_to("storage", got, received_announcements[c], i)
581             subscribing_clients.append(c)
582             expected_announcements[i] += 1 # all expect a 'storage' announcement
583
584             node_furl = tub.registerReference(Referenceable())
585             if i < NUM_STORAGE:
586                 if i == 0:
587                     c.publish(node_furl, "storage", "ri_name")
588                     printable_serverids[i] = get_tubid_string(node_furl)
589                 elif i == 1:
590                     # sign the announcement
591                     privkey_s, pubkey_s = keyutil.make_keypair()
592                     privkey, _ignored = keyutil.parse_privkey(privkey_s)
593                     privkeys[c] = privkey
594                     c.publish("storage", make_ann(node_furl), privkey)
595                     if server_version == V1:
596                         printable_serverids[i] = get_tubid_string(node_furl)
597                     else:
598                         assert pubkey_s.startswith("pub-")
599                         printable_serverids[i] = pubkey_s[len("pub-"):]
600                 else:
601                     c.publish("storage", make_ann(node_furl))
602                     printable_serverids[i] = get_tubid_string(node_furl)
603                 publishing_clients.append(c)
604             else:
605                 # the last one does not publish anything
606                 pass
607
608             if i == 0:
609                 # users of the V1 client were required to publish a
610                 # 'stub_client' record (somewhat after they published the
611                 # 'storage' record), so the introducer could see their
612                 # version. Match that behavior.
613                 c.publish(node_furl, "stub_client", "stub_ri_name")
614
615             if i == 2:
616                 # also publish something that nobody cares about
617                 boring_furl = tub.registerReference(Referenceable())
618                 c.publish("boring", make_ann(boring_furl))
619
620             c.setServiceParent(self.parent)
621             clients.append(c)
622             tubs[c] = tub
623
624
625         def _wait_for_connected(ign):
626             def _connected():
627                 for c in clients:
628                     if not c.connected_to_introducer():
629                         return False
630                 return True
631             return self.poll(_connected)
632
633         # we watch the clients to determine when the system has settled down.
634         # Then we can look inside the server to assert things about its
635         # state.
636
637         def _wait_for_expected_announcements(ign):
638             def _got_expected_announcements():
639                 for i,c in enumerate(subscribing_clients):
640                     if len(received_announcements[c]) < expected_announcements[i]:
641                         return False
642                 return True
643             return self.poll(_got_expected_announcements)
644
645         # before shutting down any Tub, we'd like to know that there are no
646         # messages outstanding
647
648         def _wait_until_idle(ign):
649             def _idle():
650                 for c in subscribing_clients + publishing_clients:
651                     if c._debug_outstanding:
652                         return False
653                 if self.the_introducer._debug_outstanding:
654                     return False
655                 return True
656             return self.poll(_idle)
657
658         d = defer.succeed(None)
659         d.addCallback(_wait_for_connected)
660         d.addCallback(_wait_for_expected_announcements)
661         d.addCallback(_wait_until_idle)
662
663         def _check1(res):
664             log.msg("doing _check1")
665             dc = self.the_introducer._debug_counts
666             if server_version == V1:
667                 # each storage server publishes a record, and (after its
668                 # 'subscribe' has been ACKed) also publishes a "stub_client".
669                 # The non-storage client (which subscribes) also publishes a
670                 # stub_client. There is also one "boring" service. The number
671                 # of messages is higher, because the stub_clients aren't
672                 # published until after we get the 'subscribe' ack (since we
673                 # don't realize that we're dealing with a v1 server [which
674                 # needs stub_clients] until then), and the act of publishing
675                 # the stub_client causes us to re-send all previous
676                 # announcements.
677                 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
678                                      NUM_STORAGE + NUM_CLIENTS + 1)
679             else:
680                 # each storage server publishes a record. There is also one
681                 # "stub_client" and one "boring"
682                 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
683                 self.failUnlessEqual(dc["inbound_duplicate"], 0)
684             self.failUnlessEqual(dc["inbound_update"], 0)
685             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
686             # the number of outbound messages is tricky.. I think it depends
687             # upon a race between the publish and the subscribe messages.
688             self.failUnless(dc["outbound_message"] > 0)
689             # each client subscribes to "storage", and each server publishes
690             self.failUnlessEqual(dc["outbound_announcements"],
691                                  NUM_STORAGE*NUM_CLIENTS)
692
693             for c in subscribing_clients:
694                 cdc = c._debug_counts
695                 self.failUnless(cdc["inbound_message"])
696                 self.failUnlessEqual(cdc["inbound_announcement"],
697                                      NUM_STORAGE)
698                 self.failUnlessEqual(cdc["wrong_service"], 0)
699                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
700                 self.failUnlessEqual(cdc["update"], 0)
701                 self.failUnlessEqual(cdc["new_announcement"],
702                                      NUM_STORAGE)
703                 anns = received_announcements[c]
704                 self.failUnlessEqual(len(anns), NUM_STORAGE)
705
706                 nodeid0 = tubs[clients[0]].tubID
707                 ann = anns[nodeid0]
708                 nick = ann["nickname"]
709                 self.failUnlessEqual(type(nick), unicode)
710                 self.failUnlessEqual(nick, NICKNAME % "0")
711             if server_version == V1:
712                 for c in publishing_clients:
713                     cdc = c._debug_counts
714                     expected = 1 # storage
715                     if c is clients[2]:
716                         expected += 1 # boring
717                     if c is not clients[0]:
718                         # the v2 client tries to call publish_v2, which fails
719                         # because the server is v1. It then re-sends
720                         # everything it has so far, plus a stub_client record
721                         expected = 2*expected + 1
722                     if c is clients[0]:
723                         # we always tell v1 client to send stub_client
724                         expected += 1
725                     self.failUnlessEqual(cdc["outbound_message"], expected)
726             else:
727                 for c in publishing_clients:
728                     cdc = c._debug_counts
729                     expected = 1
730                     if c in [clients[0], # stub_client
731                              clients[2], # boring
732                              ]:
733                         expected = 2
734                     self.failUnlessEqual(cdc["outbound_message"], expected)
735             # now check the web status, make sure it renders without error
736             ir = introweb.IntroducerRoot(self.parent)
737             self.parent.nodeid = "NODEID"
738             text = ir.renderSynchronously().decode("utf-8")
739             self.failUnlessIn(NICKNAME % "0", text) # the v1 client
740             self.failUnlessIn(NICKNAME % "1", text) # a v2 client
741             for i in range(NUM_STORAGE):
742                 self.failUnlessIn(printable_serverids[i], text,
743                                   (i,printable_serverids[i],text))
744                 # make sure there isn't a double-base32ed string too
745                 self.failIfIn(idlib.nodeid_b2a(printable_serverids[i]), text,
746                               (i,printable_serverids[i],text))
747             log.msg("_check1 done")
748         d.addCallback(_check1)
749
750         # force an introducer reconnect, by shutting down the Tub it's using
751         # and starting a new Tub (with the old introducer). Everybody should
752         # reconnect and republish, but the introducer should ignore the
753         # republishes as duplicates. However, because the server doesn't know
754         # what each client does and does not know, it will send them a copy
755         # of the current announcement table anyway.
756
757         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
758         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
759
760         def _wait_for_introducer_loss(ign):
761             def _introducer_lost():
762                 for c in clients:
763                     if c.connected_to_introducer():
764                         return False
765                 return True
766             return self.poll(_introducer_lost)
767         d.addCallback(_wait_for_introducer_loss)
768
769         def _restart_introducer_tub(_ign):
770             log.msg("restarting introducer's Tub")
771             # reset counters
772             for i in range(NUM_CLIENTS):
773                 c = subscribing_clients[i]
774                 for k in c._debug_counts:
775                     c._debug_counts[k] = 0
776             for k in self.the_introducer._debug_counts:
777                 self.the_introducer._debug_counts[k] = 0
778             expected_announcements[i] += 1 # new 'storage' for everyone
779             self.create_tub(self.central_portnum)
780             newfurl = self.central_tub.registerReference(self.the_introducer,
781                                                          furlFile=iff)
782             assert newfurl == self.introducer_furl
783         d.addCallback(_restart_introducer_tub)
784
785         d.addCallback(_wait_for_connected)
786         d.addCallback(_wait_for_expected_announcements)
787         d.addCallback(_wait_until_idle)
788         d.addCallback(lambda _ign: log.msg(" reconnected"))
789
790         # TODO: publish something while the introducer is offline, then
791         # confirm it gets delivered when the connection is reestablished
792         def _check2(res):
793             log.msg("doing _check2")
794             # assert that the introducer sent out new messages, one per
795             # subscriber
796             dc = self.the_introducer._debug_counts
797             self.failUnlessEqual(dc["outbound_announcements"],
798                                  NUM_STORAGE*NUM_CLIENTS)
799             self.failUnless(dc["outbound_message"] > 0)
800             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
801             for c in subscribing_clients:
802                 cdc = c._debug_counts
803                 self.failUnlessEqual(cdc["inbound_message"], 1)
804                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
805                 self.failUnlessEqual(cdc["new_announcement"], 0)
806                 self.failUnlessEqual(cdc["wrong_service"], 0)
807                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
808         d.addCallback(_check2)
809
810         # Then force an introducer restart, by shutting down the Tub,
811         # destroying the old introducer, and starting a new Tub+Introducer.
812         # Everybody should reconnect and republish, and the (new) introducer
813         # will distribute the new announcements, but the clients should
814         # ignore the republishes as duplicates.
815
816         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
817         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
818         d.addCallback(_wait_for_introducer_loss)
819         d.addCallback(lambda _ign: log.msg("introducer lost"))
820
821         def _restart_introducer(_ign):
822             log.msg("restarting introducer")
823             self.create_tub(self.central_portnum)
824             # reset counters
825             for i in range(NUM_CLIENTS):
826                 c = subscribing_clients[i]
827                 for k in c._debug_counts:
828                     c._debug_counts[k] = 0
829             expected_announcements[i] += 1 # new 'storage' for everyone
830             if server_version == V1:
831                 introducer = old.IntroducerService_v1()
832             else:
833                 introducer = IntroducerService()
834             self.the_introducer = introducer
835             newfurl = self.central_tub.registerReference(self.the_introducer,
836                                                          furlFile=iff)
837             assert newfurl == self.introducer_furl
838         d.addCallback(_restart_introducer)
839
840         d.addCallback(_wait_for_connected)
841         d.addCallback(_wait_for_expected_announcements)
842         d.addCallback(_wait_until_idle)
843
844         def _check3(res):
845             log.msg("doing _check3")
846             dc = self.the_introducer._debug_counts
847             self.failUnlessEqual(dc["outbound_announcements"],
848                                  NUM_STORAGE*NUM_CLIENTS)
849             self.failUnless(dc["outbound_message"] > 0)
850             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
851             for c in subscribing_clients:
852                 cdc = c._debug_counts
853                 self.failUnless(cdc["inbound_message"] > 0)
854                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
855                 self.failUnlessEqual(cdc["new_announcement"], 0)
856                 self.failUnlessEqual(cdc["wrong_service"], 0)
857                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
858
859         d.addCallback(_check3)
860         return d
861
862
863     def test_system_v2_server(self):
864         self.basedir = "introducer/SystemTest/system_v2_server"
865         os.makedirs(self.basedir)
866         return self.do_system_test(V2)
867     test_system_v2_server.timeout = 480
868     # occasionally takes longer than 350s on "draco"
869
870     def test_system_v1_server(self):
871         self.basedir = "introducer/SystemTest/system_v1_server"
872         os.makedirs(self.basedir)
873         return self.do_system_test(V1)
874     test_system_v1_server.timeout = 480
875     # occasionally takes longer than 350s on "draco"
876
877 class FakeRemoteReference:
878     def notifyOnDisconnect(self, *args, **kwargs): pass
879     def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
880     def getLocationHints(self): return ["tcp:here.example.com:1234",
881                                         "tcp:there.example.com2345"]
882     def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
883                                                   3456)
884
885 class ClientInfo(unittest.TestCase):
886     def test_client_v2(self):
887         introducer = IntroducerService()
888         tub = introducer_furl = None
889         app_versions = {"whizzy": "fizzy"}
890         client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
891                                      "my_version", "oldest", app_versions,
892                                      fakeseq)
893         #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
894         #ann_s = make_ann_t(client_v2, furl1, None, 10)
895         #introducer.remote_publish_v2(ann_s, Referenceable())
896         subscriber = FakeRemoteReference()
897         introducer.remote_subscribe_v2(subscriber, "storage",
898                                        client_v2._my_subscriber_info)
899         subs = introducer.get_subscribers()
900         self.failUnlessEqual(len(subs), 1)
901         s0 = subs[0]
902         self.failUnlessEqual(s0.service_name, "storage")
903         self.failUnlessEqual(s0.app_versions, app_versions)
904         self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
905         self.failUnlessEqual(s0.version, "my_version")
906
907     def test_client_v1(self):
908         introducer = IntroducerService()
909         subscriber = FakeRemoteReference()
910         introducer.remote_subscribe(subscriber, "storage")
911         # the v1 subscribe interface had no subscriber_info: that was usually
912         # sent in a separate stub_client pseudo-announcement
913         subs = introducer.get_subscribers()
914         self.failUnlessEqual(len(subs), 1)
915         s0 = subs[0]
916         self.failUnlessEqual(s0.nickname, u"?") # not known yet
917         self.failUnlessEqual(s0.service_name, "storage")
918
919         # now submit the stub_client announcement
920         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
921         ann = (furl1, "stub_client", "RIStubClient",
922                (NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
923         introducer.remote_publish(ann)
924         # the server should correlate the two
925         subs = introducer.get_subscribers()
926         self.failUnlessEqual(len(subs), 1)
927         s0 = subs[0]
928         self.failUnlessEqual(s0.service_name, "storage")
929         # v1 announcements do not contain app-versions
930         self.failUnlessEqual(s0.app_versions, {})
931         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
932         self.failUnlessEqual(s0.version, "my_version")
933
934         # a subscription that arrives after the stub_client announcement
935         # should be correlated too
936         subscriber2 = FakeRemoteReference()
937         introducer.remote_subscribe(subscriber2, "thing2")
938
939         subs = introducer.get_subscribers()
940         self.failUnlessEqual(len(subs), 2)
941         s0 = [s for s in subs if s.service_name == "thing2"][0]
942         # v1 announcements do not contain app-versions
943         self.failUnlessEqual(s0.app_versions, {})
944         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
945         self.failUnlessEqual(s0.version, "my_version")
946
947 class Announcements(unittest.TestCase):
948     def test_client_v2_unsigned(self):
949         introducer = IntroducerService()
950         tub = introducer_furl = None
951         app_versions = {"whizzy": "fizzy"}
952         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
953                                      "my_version", "oldest", app_versions,
954                                      fakeseq)
955         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
956         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
957         ann_s0 = make_ann_t(client_v2, furl1, None, 10)
958         canary0 = Referenceable()
959         introducer.remote_publish_v2(ann_s0, canary0)
960         a = introducer.get_announcements()
961         self.failUnlessEqual(len(a), 1)
962         self.failUnlessIdentical(a[0].canary, canary0)
963         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
964         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
965         self.failUnlessEqual(a[0].nickname, u"nick-v2")
966         self.failUnlessEqual(a[0].service_name, "storage")
967         self.failUnlessEqual(a[0].version, "my_version")
968         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
969
970     def test_client_v2_signed(self):
971         introducer = IntroducerService()
972         tub = introducer_furl = None
973         app_versions = {"whizzy": "fizzy"}
974         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
975                                      "my_version", "oldest", app_versions,
976                                      fakeseq)
977         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
978         sk_s, vk_s = keyutil.make_keypair()
979         sk, _ignored = keyutil.parse_privkey(sk_s)
980         pks = keyutil.remove_prefix(vk_s, "pub-")
981         ann_t0 = make_ann_t(client_v2, furl1, sk, 10)
982         canary0 = Referenceable()
983         introducer.remote_publish_v2(ann_t0, canary0)
984         a = introducer.get_announcements()
985         self.failUnlessEqual(len(a), 1)
986         self.failUnlessIdentical(a[0].canary, canary0)
987         self.failUnlessEqual(a[0].index, ("storage", pks, None))
988         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
989         self.failUnlessEqual(a[0].nickname, u"nick-v2")
990         self.failUnlessEqual(a[0].service_name, "storage")
991         self.failUnlessEqual(a[0].version, "my_version")
992         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
993
994     def test_client_v1(self):
995         introducer = IntroducerService()
996
997         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
998         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
999         ann = (furl1, "storage", "RIStorage",
1000                u"nick-v1".encode("utf-8"), "my_version", "oldest")
1001         introducer.remote_publish(ann)
1002
1003         a = introducer.get_announcements()
1004         self.failUnlessEqual(len(a), 1)
1005         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
1006         self.failUnlessEqual(a[0].canary, None)
1007         self.failUnlessEqual(a[0].announcement["app-versions"], {})
1008         self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
1009         self.failUnlessEqual(a[0].service_name, "storage")
1010         self.failUnlessEqual(a[0].version, "my_version")
1011         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
1012
1013 class ClientSeqnums(unittest.TestCase):
1014     def test_client(self):
1015         basedir = "introducer/ClientSeqnums/test_client"
1016         fileutil.make_dirs(basedir)
1017         f = open(os.path.join(basedir, "tahoe.cfg"), "w")
1018         f.write("[client]\n")
1019         f.write("introducer.furl = nope\n")
1020         f.close()
1021         c = TahoeClient(basedir)
1022         ic = c.introducer_client
1023         outbound = ic._outbound_announcements
1024         published = ic._published_announcements
1025         def read_seqnum():
1026             f = open(os.path.join(basedir, "announcement-seqnum"))
1027             seqnum = f.read().strip()
1028             f.close()
1029             return int(seqnum)
1030
1031         ic.publish("sA", {"key": "value1"}, c._node_key)
1032         self.failUnlessEqual(read_seqnum(), 1)
1033         self.failUnless("sA" in outbound)
1034         self.failUnlessEqual(outbound["sA"]["seqnum"], 1)
1035         nonce1 = outbound["sA"]["nonce"]
1036         self.failUnless(isinstance(nonce1, str))
1037         self.failUnlessEqual(simplejson.loads(published["sA"][0]),
1038                              outbound["sA"])
1039         # [1] is the signature, [2] is the pubkey
1040
1041         # publishing a second service causes both services to be
1042         # re-published, with the next higher sequence number
1043         ic.publish("sB", {"key": "value2"}, c._node_key)
1044         self.failUnlessEqual(read_seqnum(), 2)
1045         self.failUnless("sB" in outbound)
1046         self.failUnlessEqual(outbound["sB"]["seqnum"], 2)
1047         self.failUnless("sA" in outbound)
1048         self.failUnlessEqual(outbound["sA"]["seqnum"], 2)
1049         nonce2 = outbound["sA"]["nonce"]
1050         self.failUnless(isinstance(nonce2, str))
1051         self.failIfEqual(nonce1, nonce2)
1052         self.failUnlessEqual(simplejson.loads(published["sA"][0]),
1053                              outbound["sA"])
1054         self.failUnlessEqual(simplejson.loads(published["sB"][0]),
1055                              outbound["sB"])
1056
1057
1058
1059 class TooNewServer(IntroducerService):
1060     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
1061                  { },
1062                 "application-version": "greetings from the crazy future",
1063                 }
1064
1065 class NonV1Server(SystemTestMixin, unittest.TestCase):
1066     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
1067     # protocol, it is supposed to provide a useful error instead of a weird
1068     # exception.
1069
1070     def test_failure(self):
1071         self.basedir = "introducer/NonV1Server/failure"
1072         os.makedirs(self.basedir)
1073         self.create_tub()
1074         i = TooNewServer()
1075         i.setServiceParent(self.parent)
1076         self.introducer_furl = self.central_tub.registerReference(i)
1077
1078         tub = Tub()
1079         tub.setOption("expose-remote-exception-types", False)
1080         tub.setServiceParent(self.parent)
1081         l = tub.listenOn("tcp:0")
1082         portnum = l.getPortnum()
1083         tub.setLocation("localhost:%d" % portnum)
1084
1085         c = IntroducerClient(tub, self.introducer_furl,
1086                              u"nickname-client", "version", "oldest", {},
1087                              fakeseq)
1088         announcements = {}
1089         def got(key_s, ann):
1090             announcements[key_s] = ann
1091         c.subscribe_to("storage", got)
1092
1093         c.setServiceParent(self.parent)
1094
1095         # now we wait for it to connect and notice the bad version
1096
1097         def _got_bad():
1098             return bool(c._introducer_error) or bool(c._publisher)
1099         d = self.poll(_got_bad)
1100         def _done(res):
1101             self.failUnless(c._introducer_error)
1102             self.failUnless(c._introducer_error.check(InsufficientVersionError),
1103                             c._introducer_error)
1104         d.addCallback(_done)
1105         return d
1106
1107 class DecodeFurl(unittest.TestCase):
1108     def test_decode(self):
1109         # make sure we have a working base64.b32decode. The one in
1110         # python2.4.[01] was broken.
1111         furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
1112         m = re.match(r'pb://(\w+)@', furl)
1113         assert m
1114         nodeid = b32decode(m.group(1).upper())
1115         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
1116
1117 class Signatures(unittest.TestCase):
1118     def test_sign(self):
1119         ann = {"key1": "value1"}
1120         sk_s,vk_s = keyutil.make_keypair()
1121         sk,ignored = keyutil.parse_privkey(sk_s)
1122         ann_t = sign_to_foolscap(ann, sk)
1123         (msg, sig, key) = ann_t
1124         self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
1125         self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
1126         self.failUnless(sig.startswith("v0-"))
1127         self.failUnless(key.startswith("v0-"))
1128         (ann2,key2) = unsign_from_foolscap(ann_t)
1129         self.failUnlessEqual(ann2, ann)
1130         self.failUnlessEqual("pub-"+key2, vk_s)
1131
1132         # bad signature
1133         bad_ann = {"key1": "value2"}
1134         bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
1135         self.failUnlessRaises(keyutil.BadSignatureError,
1136                               unsign_from_foolscap, (bad_msg,sig,key))
1137         # sneaky bad signature should be ignored
1138         (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
1139         self.failUnlessEqual(key2, None)
1140         self.failUnlessEqual(ann2, bad_ann)
1141
1142         # unrecognized signatures
1143         self.failUnlessRaises(UnknownKeyError,
1144                               unsign_from_foolscap, (bad_msg,"v999-sig",key))
1145         self.failUnlessRaises(UnknownKeyError,
1146                               unsign_from_foolscap, (bad_msg,sig,"v999-key"))
1147
1148
1149 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
1150 # should leave the Reconnector in place, also if it receives
1151 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
1152 # should tear down the Reconnector and make a new one. This behavior used to
1153 # live in the IntroducerClient, and thus used to be tested by test_introducer
1154
1155 # copying more tests from old branch:
1156
1157 #  then also add Upgrade test