]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
e8eca5028443f1251a7ffd72239869ba149ec6aa
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 import os, re
3 from base64 import b32decode
4 import simplejson
5
6 from twisted.trial import unittest
7 from twisted.internet import defer, address
8 from twisted.python import log
9
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14      WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17      get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
18      UnknownKeyError
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.web import introweb
23 from allmydata.util import pollmixin, keyutil, idlib
24 import allmydata.test.common_util as testutil
25
26 class LoggingMultiService(service.MultiService):
27     def log(self, msg, **kw):
28         log.msg(msg, **kw)
29
30 class Node(testutil.SignalMixin, unittest.TestCase):
31     def test_loadable(self):
32         basedir = "introducer.IntroducerNode.test_loadable"
33         os.mkdir(basedir)
34         q = IntroducerNode(basedir)
35         d = fireEventually(None)
36         d.addCallback(lambda res: q.startService())
37         d.addCallback(lambda res: q.when_tub_ready())
38         d.addCallback(lambda res: q.stopService())
39         d.addCallback(flushEventualQueue)
40         return d
41
42 class ServiceMixin:
43     def setUp(self):
44         self.parent = LoggingMultiService()
45         self.parent.startService()
46     def tearDown(self):
47         log.msg("TestIntroducer.tearDown")
48         d = defer.succeed(None)
49         d.addCallback(lambda res: self.parent.stopService())
50         d.addCallback(flushEventualQueue)
51         return d
52
53 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
54
55     def test_create(self):
56         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
57                               "my_version", "oldest_version", {})
58         self.failUnless(isinstance(ic, IntroducerClient))
59
60     def test_listen(self):
61         i = IntroducerService()
62         i.setServiceParent(self.parent)
63
64     def test_duplicate_publish(self):
65         i = IntroducerService()
66         self.failUnlessEqual(len(i.get_announcements()), 0)
67         self.failUnlessEqual(len(i.get_subscribers()), 0)
68         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
69         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
70         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
71         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
72         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
73         i.remote_publish(ann1)
74         self.failUnlessEqual(len(i.get_announcements()), 1)
75         self.failUnlessEqual(len(i.get_subscribers()), 0)
76         i.remote_publish(ann2)
77         self.failUnlessEqual(len(i.get_announcements()), 2)
78         self.failUnlessEqual(len(i.get_subscribers()), 0)
79         i.remote_publish(ann1b)
80         self.failUnlessEqual(len(i.get_announcements()), 2)
81         self.failUnlessEqual(len(i.get_subscribers()), 0)
82
83     def test_id_collision(self):
84         # test replacement case where tubid equals a keyid (one should
85         # not replace the other)
86         i = IntroducerService()
87         ic = IntroducerClient(None,
88                               "introducer.furl", u"my_nickname",
89                               "my_version", "oldest_version", {})
90         sk_s, vk_s = keyutil.make_keypair()
91         sk, _ignored = keyutil.parse_privkey(sk_s)
92         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
93         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
94         ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
95         i.remote_publish_v2(ann_t, Referenceable())
96         announcements = i.get_announcements()
97         self.failUnlessEqual(len(announcements), 1)
98         key1 = ("storage", "v0-"+keyid, None)
99         self.failUnlessEqual(announcements[0].index, key1)
100         ann1_out = announcements[0].announcement
101         self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
102
103         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
104         ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
105         i.remote_publish(ann2)
106         announcements = i.get_announcements()
107         self.failUnlessEqual(len(announcements), 2)
108         key2 = ("storage", None, keyid)
109         wanted = [ad for ad in announcements if ad.index == key2]
110         self.failUnlessEqual(len(wanted), 1)
111         ann2_out = wanted[0].announcement
112         self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
113
114
115 def make_ann(furl):
116     ann = { "anonymous-storage-FURL": furl,
117             "permutation-seed-base32": get_tubid_string(furl) }
118     return ann
119
120 def make_ann_t(ic, furl, privkey, seqnum):
121     def mod(ann):
122         ann["seqnum"] = seqnum
123         if seqnum is None:
124             del ann["seqnum"]
125         return ann
126     return ic.create_announcement("storage", make_ann(furl), privkey, mod)
127
128 class Client(unittest.TestCase):
129     def test_duplicate_receive_v1(self):
130         ic = IntroducerClient(None,
131                               "introducer.furl", u"my_nickname",
132                               "my_version", "oldest_version", {})
133         announcements = []
134         ic.subscribe_to("storage",
135                         lambda key_s,ann: announcements.append(ann))
136         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
137         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
138         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
139         ca = WrapV2ClientInV1Interface(ic)
140
141         ca.remote_announce([ann1])
142         d = fireEventually()
143         def _then(ign):
144             self.failUnlessEqual(len(announcements), 1)
145             self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
146             self.failUnlessEqual(announcements[0]["my-version"], "ver23")
147             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
148             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
149             self.failUnlessEqual(ic._debug_counts["update"], 0)
150             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
151             # now send a duplicate announcement: this should not notify clients
152             ca.remote_announce([ann1])
153             return fireEventually()
154         d.addCallback(_then)
155         def _then2(ign):
156             self.failUnlessEqual(len(announcements), 1)
157             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
158             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
159             self.failUnlessEqual(ic._debug_counts["update"], 0)
160             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
161             # and a replacement announcement: same FURL, new other stuff.
162             # Clients should be notified.
163             ca.remote_announce([ann1b])
164             return fireEventually()
165         d.addCallback(_then2)
166         def _then3(ign):
167             self.failUnlessEqual(len(announcements), 2)
168             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
169             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
170             self.failUnlessEqual(ic._debug_counts["update"], 1)
171             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
172             # test that the other stuff changed
173             self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
174             self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
175         d.addCallback(_then3)
176         return d
177
178     def test_duplicate_receive_v2(self):
179         ic1 = IntroducerClient(None,
180                                "introducer.furl", u"my_nickname",
181                                "ver23", "oldest_version", {})
182         # we use a second client just to create a different-looking
183         # announcement
184         ic2 = IntroducerClient(None,
185                                "introducer.furl", u"my_nickname",
186                                "ver24","oldest_version",{})
187         announcements = []
188         def _received(key_s, ann):
189             announcements.append( (key_s, ann) )
190         ic1.subscribe_to("storage", _received)
191         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
192         furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
193         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
194
195         privkey_s, pubkey_vs = keyutil.make_keypair()
196         privkey, _ignored = keyutil.parse_privkey(privkey_s)
197         pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
198
199         # ann1: ic1, furl1
200         # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
201         # ann1b: ic2, furl1
202         # ann2: ic2, furl2
203
204         self.ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
205         self.ann1old = make_ann_t(ic1, furl1, privkey, seqnum=9)
206         self.ann1noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
207         self.ann1b = make_ann_t(ic2, furl1, privkey, seqnum=11)
208         self.ann1a = make_ann_t(ic1, furl1a, privkey, seqnum=12)
209         self.ann2 = make_ann_t(ic2, furl2, privkey, seqnum=13)
210
211         ic1.remote_announce_v2([self.ann1]) # queues eventual-send
212         d = fireEventually()
213         def _then1(ign):
214             self.failUnlessEqual(len(announcements), 1)
215             key_s,ann = announcements[0]
216             self.failUnlessEqual(key_s, pubkey_s)
217             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
218             self.failUnlessEqual(ann["my-version"], "ver23")
219         d.addCallback(_then1)
220
221         # now send a duplicate announcement. This should not fire the
222         # subscriber
223         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
224         d.addCallback(fireEventually)
225         def _then2(ign):
226             self.failUnlessEqual(len(announcements), 1)
227         d.addCallback(_then2)
228
229         # an older announcement shouldn't fire the subscriber either
230         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1old]))
231         d.addCallback(fireEventually)
232         def _then2a(ign):
233             self.failUnlessEqual(len(announcements), 1)
234         d.addCallback(_then2a)
235
236         # announcement with no seqnum cannot replace one with-seqnum
237         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1noseqnum]))
238         d.addCallback(fireEventually)
239         def _then2b(ign):
240             self.failUnlessEqual(len(announcements), 1)
241         d.addCallback(_then2b)
242
243         # and a replacement announcement: same FURL, new other stuff. The
244         # subscriber *should* be fired.
245         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
246         d.addCallback(fireEventually)
247         def _then3(ign):
248             self.failUnlessEqual(len(announcements), 2)
249             key_s,ann = announcements[-1]
250             self.failUnlessEqual(key_s, pubkey_s)
251             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
252             self.failUnlessEqual(ann["my-version"], "ver24")
253         d.addCallback(_then3)
254
255         # and a replacement announcement with a different FURL (it uses
256         # different connection hints)
257         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
258         d.addCallback(fireEventually)
259         def _then4(ign):
260             self.failUnlessEqual(len(announcements), 3)
261             key_s,ann = announcements[-1]
262             self.failUnlessEqual(key_s, pubkey_s)
263             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
264             self.failUnlessEqual(ann["my-version"], "ver23")
265         d.addCallback(_then4)
266
267         # now add a new subscription, which should be called with the
268         # backlog. The introducer only records one announcement per index, so
269         # the backlog will only have the latest message.
270         announcements2 = []
271         def _received2(key_s, ann):
272             announcements2.append( (key_s, ann) )
273         d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
274         d.addCallback(fireEventually)
275         def _then5(ign):
276             self.failUnlessEqual(len(announcements2), 1)
277             key_s,ann = announcements2[-1]
278             self.failUnlessEqual(key_s, pubkey_s)
279             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
280             self.failUnlessEqual(ann["my-version"], "ver23")
281         d.addCallback(_then5)
282         return d
283
284     def test_id_collision(self):
285         # test replacement case where tubid equals a keyid (one should
286         # not replace the other)
287         ic = IntroducerClient(None,
288                               "introducer.furl", u"my_nickname",
289                               "my_version", "oldest_version", {})
290         announcements = []
291         ic.subscribe_to("storage",
292                         lambda key_s,ann: announcements.append(ann))
293         sk_s, vk_s = keyutil.make_keypair()
294         sk, _ignored = keyutil.parse_privkey(sk_s)
295         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
296         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
297         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
298         ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
299         ic.remote_announce_v2([ann_t])
300         d = fireEventually()
301         def _then(ign):
302             # first announcement has been processed
303             self.failUnlessEqual(len(announcements), 1)
304             self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
305                                  furl1)
306             # now submit a second one, with a tubid that happens to look just
307             # like the pubkey-based serverid we just processed. They should
308             # not overlap.
309             ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
310             ca = WrapV2ClientInV1Interface(ic)
311             ca.remote_announce([ann2])
312             return fireEventually()
313         d.addCallback(_then)
314         def _then2(ign):
315             # if they overlapped, the second announcement would be ignored
316             self.failUnlessEqual(len(announcements), 2)
317             self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
318                                  furl2)
319         d.addCallback(_then2)
320         return d
321
322 class Server(unittest.TestCase):
323     def test_duplicate(self):
324         i = IntroducerService()
325         ic1 = IntroducerClient(None,
326                                "introducer.furl", u"my_nickname",
327                                "ver23", "oldest_version", {})
328         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
329
330         privkey_s, _ = keyutil.make_keypair()
331         privkey, _ = keyutil.parse_privkey(privkey_s)
332
333         ann1 = make_ann_t(ic1, furl1, privkey, seqnum=10)
334         ann1_old = make_ann_t(ic1, furl1, privkey, seqnum=9)
335         ann1_new = make_ann_t(ic1, furl1, privkey, seqnum=11)
336         ann1_noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
337
338         i.remote_publish_v2(ann1, None)
339         all = i.get_announcements()
340         self.failUnlessEqual(len(all), 1)
341         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
342         self.failUnlessEqual(i._debug_counts["inbound_message"], 1)
343         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 0)
344         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
345         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
346         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
347
348         i.remote_publish_v2(ann1, None)
349         all = i.get_announcements()
350         self.failUnlessEqual(len(all), 1)
351         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
352         self.failUnlessEqual(i._debug_counts["inbound_message"], 2)
353         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
354         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
355         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 0)
356         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
357
358         i.remote_publish_v2(ann1_old, None)
359         all = i.get_announcements()
360         self.failUnlessEqual(len(all), 1)
361         self.failUnlessEqual(all[0].announcement["seqnum"], 10)
362         self.failUnlessEqual(i._debug_counts["inbound_message"], 3)
363         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
364         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
365         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
366         self.failUnlessEqual(i._debug_counts["inbound_update"], 0)
367
368         i.remote_publish_v2(ann1_new, None)
369         all = i.get_announcements()
370         self.failUnlessEqual(len(all), 1)
371         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
372         self.failUnlessEqual(i._debug_counts["inbound_message"], 4)
373         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
374         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 0)
375         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
376         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
377
378         i.remote_publish_v2(ann1_noseqnum, None)
379         all = i.get_announcements()
380         self.failUnlessEqual(len(all), 1)
381         self.failUnlessEqual(all[0].announcement["seqnum"], 11)
382         self.failUnlessEqual(i._debug_counts["inbound_message"], 5)
383         self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
384         self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 1)
385         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
386         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
387
388
389 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
390
391 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
392
393     def create_tub(self, portnum=0):
394         tubfile = os.path.join(self.basedir, "tub.pem")
395         self.central_tub = tub = Tub(certFile=tubfile)
396         #tub.setOption("logLocalFailures", True)
397         #tub.setOption("logRemoteFailures", True)
398         tub.setOption("expose-remote-exception-types", False)
399         tub.setServiceParent(self.parent)
400         l = tub.listenOn("tcp:%d" % portnum)
401         self.central_portnum = l.getPortnum()
402         if portnum != 0:
403             assert self.central_portnum == portnum
404         tub.setLocation("localhost:%d" % self.central_portnum)
405
406 class Queue(SystemTestMixin, unittest.TestCase):
407     def test_queue_until_connected(self):
408         self.basedir = "introducer/QueueUntilConnected/queued"
409         os.makedirs(self.basedir)
410         self.create_tub()
411         introducer = IntroducerService()
412         introducer.setServiceParent(self.parent)
413         iff = os.path.join(self.basedir, "introducer.furl")
414         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
415         tub2 = Tub()
416         tub2.setServiceParent(self.parent)
417         c = IntroducerClient(tub2, ifurl,
418                              u"nickname", "version", "oldest", {})
419         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
420         sk_s, vk_s = keyutil.make_keypair()
421         sk, _ignored = keyutil.parse_privkey(sk_s)
422
423         d = introducer.disownServiceParent()
424         def _offline(ign):
425             # now that the introducer server is offline, create a client and
426             # publish some messages
427             c.setServiceParent(self.parent) # this starts the reconnector
428             c.publish("storage", make_ann(furl1), sk)
429
430             introducer.setServiceParent(self.parent) # restart the server
431             # now wait for the messages to be delivered
432             def _got_announcement():
433                 return bool(introducer.get_announcements())
434             return self.poll(_got_announcement)
435         d.addCallback(_offline)
436         def _done(ign):
437             v = introducer.get_announcements()[0]
438             furl = v.announcement["anonymous-storage-FURL"]
439             self.failUnlessEqual(furl, furl1)
440         d.addCallback(_done)
441
442         # now let the ack get back
443         def _wait_until_idle(ign):
444             def _idle():
445                 if c._debug_outstanding:
446                     return False
447                 if introducer._debug_outstanding:
448                     return False
449                 return True
450             return self.poll(_idle)
451         d.addCallback(_wait_until_idle)
452         return d
453
454
455 V1 = "v1"; V2 = "v2"
456 class SystemTest(SystemTestMixin, unittest.TestCase):
457
458     def do_system_test(self, server_version):
459         self.create_tub()
460         if server_version == V1:
461             introducer = old.IntroducerService_v1()
462         else:
463             introducer = IntroducerService()
464         introducer.setServiceParent(self.parent)
465         iff = os.path.join(self.basedir, "introducer.furl")
466         tub = self.central_tub
467         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
468         self.introducer_furl = ifurl
469
470         # we have 5 clients who publish themselves as storage servers, and a
471         # sixth which does which not. All 6 clients subscriber to hear about
472         # storage. When the connections are fully established, all six nodes
473         # should have 5 connections each.
474         NUM_STORAGE = 5
475         NUM_CLIENTS = 6
476
477         clients = []
478         tubs = {}
479         received_announcements = {}
480         subscribing_clients = []
481         publishing_clients = []
482         printable_serverids = {}
483         self.the_introducer = introducer
484         privkeys = {}
485         expected_announcements = [0 for c in range(NUM_CLIENTS)]
486
487         for i in range(NUM_CLIENTS):
488             tub = Tub()
489             #tub.setOption("logLocalFailures", True)
490             #tub.setOption("logRemoteFailures", True)
491             tub.setOption("expose-remote-exception-types", False)
492             tub.setServiceParent(self.parent)
493             l = tub.listenOn("tcp:0")
494             portnum = l.getPortnum()
495             tub.setLocation("localhost:%d" % portnum)
496
497             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
498             if i == 0:
499                 c = old.IntroducerClient_v1(tub, self.introducer_furl,
500                                             NICKNAME % str(i),
501                                             "version", "oldest")
502             else:
503                 c = IntroducerClient(tub, self.introducer_furl,
504                                      NICKNAME % str(i),
505                                      "version", "oldest",
506                                      {"component": "component-v1"})
507             received_announcements[c] = {}
508             def got(key_s_or_tubid, ann, announcements, i):
509                 if i == 0:
510                     index = get_tubid_string_from_ann(ann)
511                 else:
512                     index = key_s_or_tubid or get_tubid_string_from_ann(ann)
513                 announcements[index] = ann
514             c.subscribe_to("storage", got, received_announcements[c], i)
515             subscribing_clients.append(c)
516             expected_announcements[i] += 1 # all expect a 'storage' announcement
517
518             node_furl = tub.registerReference(Referenceable())
519             if i < NUM_STORAGE:
520                 if i == 0:
521                     c.publish(node_furl, "storage", "ri_name")
522                     printable_serverids[i] = get_tubid_string(node_furl)
523                 elif i == 1:
524                     # sign the announcement
525                     privkey_s, pubkey_s = keyutil.make_keypair()
526                     privkey, _ignored = keyutil.parse_privkey(privkey_s)
527                     privkeys[c] = privkey
528                     c.publish("storage", make_ann(node_furl), privkey)
529                     if server_version == V1:
530                         printable_serverids[i] = get_tubid_string(node_furl)
531                     else:
532                         assert pubkey_s.startswith("pub-")
533                         printable_serverids[i] = pubkey_s[len("pub-"):]
534                 else:
535                     c.publish("storage", make_ann(node_furl))
536                     printable_serverids[i] = get_tubid_string(node_furl)
537                 publishing_clients.append(c)
538             else:
539                 # the last one does not publish anything
540                 pass
541
542             if i == 0:
543                 # users of the V1 client were required to publish a
544                 # 'stub_client' record (somewhat after they published the
545                 # 'storage' record), so the introducer could see their
546                 # version. Match that behavior.
547                 c.publish(node_furl, "stub_client", "stub_ri_name")
548
549             if i == 2:
550                 # also publish something that nobody cares about
551                 boring_furl = tub.registerReference(Referenceable())
552                 c.publish("boring", make_ann(boring_furl))
553
554             c.setServiceParent(self.parent)
555             clients.append(c)
556             tubs[c] = tub
557
558
559         def _wait_for_connected(ign):
560             def _connected():
561                 for c in clients:
562                     if not c.connected_to_introducer():
563                         return False
564                 return True
565             return self.poll(_connected)
566
567         # we watch the clients to determine when the system has settled down.
568         # Then we can look inside the server to assert things about its
569         # state.
570
571         def _wait_for_expected_announcements(ign):
572             def _got_expected_announcements():
573                 for i,c in enumerate(subscribing_clients):
574                     if len(received_announcements[c]) < expected_announcements[i]:
575                         return False
576                 return True
577             return self.poll(_got_expected_announcements)
578
579         # before shutting down any Tub, we'd like to know that there are no
580         # messages outstanding
581
582         def _wait_until_idle(ign):
583             def _idle():
584                 for c in subscribing_clients + publishing_clients:
585                     if c._debug_outstanding:
586                         return False
587                 if self.the_introducer._debug_outstanding:
588                     return False
589                 return True
590             return self.poll(_idle)
591
592         d = defer.succeed(None)
593         d.addCallback(_wait_for_connected)
594         d.addCallback(_wait_for_expected_announcements)
595         d.addCallback(_wait_until_idle)
596
597         def _check1(res):
598             log.msg("doing _check1")
599             dc = self.the_introducer._debug_counts
600             if server_version == V1:
601                 # each storage server publishes a record, and (after its
602                 # 'subscribe' has been ACKed) also publishes a "stub_client".
603                 # The non-storage client (which subscribes) also publishes a
604                 # stub_client. There is also one "boring" service. The number
605                 # of messages is higher, because the stub_clients aren't
606                 # published until after we get the 'subscribe' ack (since we
607                 # don't realize that we're dealing with a v1 server [which
608                 # needs stub_clients] until then), and the act of publishing
609                 # the stub_client causes us to re-send all previous
610                 # announcements.
611                 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
612                                      NUM_STORAGE + NUM_CLIENTS + 1)
613             else:
614                 # each storage server publishes a record. There is also one
615                 # "stub_client" and one "boring"
616                 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
617                 self.failUnlessEqual(dc["inbound_duplicate"], 0)
618             self.failUnlessEqual(dc["inbound_update"], 0)
619             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
620             # the number of outbound messages is tricky.. I think it depends
621             # upon a race between the publish and the subscribe messages.
622             self.failUnless(dc["outbound_message"] > 0)
623             # each client subscribes to "storage", and each server publishes
624             self.failUnlessEqual(dc["outbound_announcements"],
625                                  NUM_STORAGE*NUM_CLIENTS)
626
627             for c in subscribing_clients:
628                 cdc = c._debug_counts
629                 self.failUnless(cdc["inbound_message"])
630                 self.failUnlessEqual(cdc["inbound_announcement"],
631                                      NUM_STORAGE)
632                 self.failUnlessEqual(cdc["wrong_service"], 0)
633                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
634                 self.failUnlessEqual(cdc["update"], 0)
635                 self.failUnlessEqual(cdc["new_announcement"],
636                                      NUM_STORAGE)
637                 anns = received_announcements[c]
638                 self.failUnlessEqual(len(anns), NUM_STORAGE)
639
640                 nodeid0 = tubs[clients[0]].tubID
641                 ann = anns[nodeid0]
642                 nick = ann["nickname"]
643                 self.failUnlessEqual(type(nick), unicode)
644                 self.failUnlessEqual(nick, NICKNAME % "0")
645             if server_version == V1:
646                 for c in publishing_clients:
647                     cdc = c._debug_counts
648                     expected = 1 # storage
649                     if c is clients[2]:
650                         expected += 1 # boring
651                     if c is not clients[0]:
652                         # the v2 client tries to call publish_v2, which fails
653                         # because the server is v1. It then re-sends
654                         # everything it has so far, plus a stub_client record
655                         expected = 2*expected + 1
656                     if c is clients[0]:
657                         # we always tell v1 client to send stub_client
658                         expected += 1
659                     self.failUnlessEqual(cdc["outbound_message"], expected)
660             else:
661                 for c in publishing_clients:
662                     cdc = c._debug_counts
663                     expected = 1
664                     if c in [clients[0], # stub_client
665                              clients[2], # boring
666                              ]:
667                         expected = 2
668                     self.failUnlessEqual(cdc["outbound_message"], expected)
669             # now check the web status, make sure it renders without error
670             ir = introweb.IntroducerRoot(self.parent)
671             self.parent.nodeid = "NODEID"
672             text = ir.renderSynchronously().decode("utf-8")
673             self.failUnlessIn(NICKNAME % "0", text) # the v1 client
674             self.failUnlessIn(NICKNAME % "1", text) # a v2 client
675             for i in range(NUM_STORAGE):
676                 self.failUnlessIn(printable_serverids[i], text,
677                                   (i,printable_serverids[i],text))
678                 # make sure there isn't a double-base32ed string too
679                 self.failIfIn(idlib.nodeid_b2a(printable_serverids[i]), text,
680                               (i,printable_serverids[i],text))
681             log.msg("_check1 done")
682         d.addCallback(_check1)
683
684         # force an introducer reconnect, by shutting down the Tub it's using
685         # and starting a new Tub (with the old introducer). Everybody should
686         # reconnect and republish, but the introducer should ignore the
687         # republishes as duplicates. However, because the server doesn't know
688         # what each client does and does not know, it will send them a copy
689         # of the current announcement table anyway.
690
691         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
692         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
693
694         def _wait_for_introducer_loss(ign):
695             def _introducer_lost():
696                 for c in clients:
697                     if c.connected_to_introducer():
698                         return False
699                 return True
700             return self.poll(_introducer_lost)
701         d.addCallback(_wait_for_introducer_loss)
702
703         def _restart_introducer_tub(_ign):
704             log.msg("restarting introducer's Tub")
705             # reset counters
706             for i in range(NUM_CLIENTS):
707                 c = subscribing_clients[i]
708                 for k in c._debug_counts:
709                     c._debug_counts[k] = 0
710             for k in self.the_introducer._debug_counts:
711                 self.the_introducer._debug_counts[k] = 0
712             expected_announcements[i] += 1 # new 'storage' for everyone
713             self.create_tub(self.central_portnum)
714             newfurl = self.central_tub.registerReference(self.the_introducer,
715                                                          furlFile=iff)
716             assert newfurl == self.introducer_furl
717         d.addCallback(_restart_introducer_tub)
718
719         d.addCallback(_wait_for_connected)
720         d.addCallback(_wait_for_expected_announcements)
721         d.addCallback(_wait_until_idle)
722         d.addCallback(lambda _ign: log.msg(" reconnected"))
723
724         # TODO: publish something while the introducer is offline, then
725         # confirm it gets delivered when the connection is reestablished
726         def _check2(res):
727             log.msg("doing _check2")
728             # assert that the introducer sent out new messages, one per
729             # subscriber
730             dc = self.the_introducer._debug_counts
731             self.failUnlessEqual(dc["outbound_announcements"],
732                                  NUM_STORAGE*NUM_CLIENTS)
733             self.failUnless(dc["outbound_message"] > 0)
734             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
735             for c in subscribing_clients:
736                 cdc = c._debug_counts
737                 self.failUnlessEqual(cdc["inbound_message"], 1)
738                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
739                 self.failUnlessEqual(cdc["new_announcement"], 0)
740                 self.failUnlessEqual(cdc["wrong_service"], 0)
741                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
742         d.addCallback(_check2)
743
744         # Then force an introducer restart, by shutting down the Tub,
745         # destroying the old introducer, and starting a new Tub+Introducer.
746         # Everybody should reconnect and republish, and the (new) introducer
747         # will distribute the new announcements, but the clients should
748         # ignore the republishes as duplicates.
749
750         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
751         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
752         d.addCallback(_wait_for_introducer_loss)
753         d.addCallback(lambda _ign: log.msg("introducer lost"))
754
755         def _restart_introducer(_ign):
756             log.msg("restarting introducer")
757             self.create_tub(self.central_portnum)
758             # reset counters
759             for i in range(NUM_CLIENTS):
760                 c = subscribing_clients[i]
761                 for k in c._debug_counts:
762                     c._debug_counts[k] = 0
763             expected_announcements[i] += 1 # new 'storage' for everyone
764             if server_version == V1:
765                 introducer = old.IntroducerService_v1()
766             else:
767                 introducer = IntroducerService()
768             self.the_introducer = introducer
769             newfurl = self.central_tub.registerReference(self.the_introducer,
770                                                          furlFile=iff)
771             assert newfurl == self.introducer_furl
772         d.addCallback(_restart_introducer)
773
774         d.addCallback(_wait_for_connected)
775         d.addCallback(_wait_for_expected_announcements)
776         d.addCallback(_wait_until_idle)
777
778         def _check3(res):
779             log.msg("doing _check3")
780             dc = self.the_introducer._debug_counts
781             self.failUnlessEqual(dc["outbound_announcements"],
782                                  NUM_STORAGE*NUM_CLIENTS)
783             self.failUnless(dc["outbound_message"] > 0)
784             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
785             for c in subscribing_clients:
786                 cdc = c._debug_counts
787                 self.failUnless(cdc["inbound_message"] > 0)
788                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
789                 self.failUnlessEqual(cdc["new_announcement"], 0)
790                 self.failUnlessEqual(cdc["wrong_service"], 0)
791                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
792
793         d.addCallback(_check3)
794         return d
795
796
797     def test_system_v2_server(self):
798         self.basedir = "introducer/SystemTest/system_v2_server"
799         os.makedirs(self.basedir)
800         return self.do_system_test(V2)
801     test_system_v2_server.timeout = 480
802     # occasionally takes longer than 350s on "draco"
803
804     def test_system_v1_server(self):
805         self.basedir = "introducer/SystemTest/system_v1_server"
806         os.makedirs(self.basedir)
807         return self.do_system_test(V1)
808     test_system_v1_server.timeout = 480
809     # occasionally takes longer than 350s on "draco"
810
811 class FakeRemoteReference:
812     def notifyOnDisconnect(self, *args, **kwargs): pass
813     def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
814     def getLocationHints(self): return [("ipv4", "here.example.com", "1234"),
815                                         ("ipv4", "there.example.com", "2345")]
816     def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
817                                                   3456)
818
819 class ClientInfo(unittest.TestCase):
820     def test_client_v2(self):
821         introducer = IntroducerService()
822         tub = introducer_furl = None
823         app_versions = {"whizzy": "fizzy"}
824         client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
825                                      "my_version", "oldest", app_versions)
826         #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
827         #ann_s = make_ann_t(client_v2, furl1, None, 10)
828         #introducer.remote_publish_v2(ann_s, Referenceable())
829         subscriber = FakeRemoteReference()
830         introducer.remote_subscribe_v2(subscriber, "storage",
831                                        client_v2._my_subscriber_info)
832         subs = introducer.get_subscribers()
833         self.failUnlessEqual(len(subs), 1)
834         s0 = subs[0]
835         self.failUnlessEqual(s0.service_name, "storage")
836         self.failUnlessEqual(s0.app_versions, app_versions)
837         self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
838         self.failUnlessEqual(s0.version, "my_version")
839
840     def test_client_v1(self):
841         introducer = IntroducerService()
842         subscriber = FakeRemoteReference()
843         introducer.remote_subscribe(subscriber, "storage")
844         # the v1 subscribe interface had no subscriber_info: that was usually
845         # sent in a separate stub_client pseudo-announcement
846         subs = introducer.get_subscribers()
847         self.failUnlessEqual(len(subs), 1)
848         s0 = subs[0]
849         self.failUnlessEqual(s0.nickname, u"?") # not known yet
850         self.failUnlessEqual(s0.service_name, "storage")
851
852         # now submit the stub_client announcement
853         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
854         ann = (furl1, "stub_client", "RIStubClient",
855                (NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
856         introducer.remote_publish(ann)
857         # the server should correlate the two
858         subs = introducer.get_subscribers()
859         self.failUnlessEqual(len(subs), 1)
860         s0 = subs[0]
861         self.failUnlessEqual(s0.service_name, "storage")
862         # v1 announcements do not contain app-versions
863         self.failUnlessEqual(s0.app_versions, {})
864         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
865         self.failUnlessEqual(s0.version, "my_version")
866
867         # a subscription that arrives after the stub_client announcement
868         # should be correlated too
869         subscriber2 = FakeRemoteReference()
870         introducer.remote_subscribe(subscriber2, "thing2")
871
872         subs = introducer.get_subscribers()
873         self.failUnlessEqual(len(subs), 2)
874         s0 = [s for s in subs if s.service_name == "thing2"][0]
875         # v1 announcements do not contain app-versions
876         self.failUnlessEqual(s0.app_versions, {})
877         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
878         self.failUnlessEqual(s0.version, "my_version")
879
880 class Announcements(unittest.TestCase):
881     def test_client_v2_unsigned(self):
882         introducer = IntroducerService()
883         tub = introducer_furl = None
884         app_versions = {"whizzy": "fizzy"}
885         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
886                                      "my_version", "oldest", app_versions)
887         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
888         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
889         ann_s0 = make_ann_t(client_v2, furl1, None, 10.0)
890         canary0 = Referenceable()
891         introducer.remote_publish_v2(ann_s0, canary0)
892         a = introducer.get_announcements()
893         self.failUnlessEqual(len(a), 1)
894         self.failUnlessIdentical(a[0].canary, canary0)
895         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
896         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
897         self.failUnlessEqual(a[0].nickname, u"nick-v2")
898         self.failUnlessEqual(a[0].service_name, "storage")
899         self.failUnlessEqual(a[0].version, "my_version")
900         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
901
902     def test_client_v2_signed(self):
903         introducer = IntroducerService()
904         tub = introducer_furl = None
905         app_versions = {"whizzy": "fizzy"}
906         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
907                                      "my_version", "oldest", app_versions)
908         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
909         sk_s, vk_s = keyutil.make_keypair()
910         sk, _ignored = keyutil.parse_privkey(sk_s)
911         pks = keyutil.remove_prefix(vk_s, "pub-")
912         ann_t0 = make_ann_t(client_v2, furl1, sk, 10.0)
913         canary0 = Referenceable()
914         introducer.remote_publish_v2(ann_t0, canary0)
915         a = introducer.get_announcements()
916         self.failUnlessEqual(len(a), 1)
917         self.failUnlessIdentical(a[0].canary, canary0)
918         self.failUnlessEqual(a[0].index, ("storage", pks, None))
919         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
920         self.failUnlessEqual(a[0].nickname, u"nick-v2")
921         self.failUnlessEqual(a[0].service_name, "storage")
922         self.failUnlessEqual(a[0].version, "my_version")
923         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
924
925     def test_client_v1(self):
926         introducer = IntroducerService()
927
928         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
929         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
930         ann = (furl1, "storage", "RIStorage",
931                u"nick-v1".encode("utf-8"), "my_version", "oldest")
932         introducer.remote_publish(ann)
933
934         a = introducer.get_announcements()
935         self.failUnlessEqual(len(a), 1)
936         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
937         self.failUnlessEqual(a[0].canary, None)
938         self.failUnlessEqual(a[0].announcement["app-versions"], {})
939         self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
940         self.failUnlessEqual(a[0].service_name, "storage")
941         self.failUnlessEqual(a[0].version, "my_version")
942         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
943
944
945 class TooNewServer(IntroducerService):
946     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
947                  { },
948                 "application-version": "greetings from the crazy future",
949                 }
950
951 class NonV1Server(SystemTestMixin, unittest.TestCase):
952     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
953     # protocol, it is supposed to provide a useful error instead of a weird
954     # exception.
955
956     def test_failure(self):
957         self.basedir = "introducer/NonV1Server/failure"
958         os.makedirs(self.basedir)
959         self.create_tub()
960         i = TooNewServer()
961         i.setServiceParent(self.parent)
962         self.introducer_furl = self.central_tub.registerReference(i)
963
964         tub = Tub()
965         tub.setOption("expose-remote-exception-types", False)
966         tub.setServiceParent(self.parent)
967         l = tub.listenOn("tcp:0")
968         portnum = l.getPortnum()
969         tub.setLocation("localhost:%d" % portnum)
970
971         c = IntroducerClient(tub, self.introducer_furl,
972                              u"nickname-client", "version", "oldest", {})
973         announcements = {}
974         def got(key_s, ann):
975             announcements[key_s] = ann
976         c.subscribe_to("storage", got)
977
978         c.setServiceParent(self.parent)
979
980         # now we wait for it to connect and notice the bad version
981
982         def _got_bad():
983             return bool(c._introducer_error) or bool(c._publisher)
984         d = self.poll(_got_bad)
985         def _done(res):
986             self.failUnless(c._introducer_error)
987             self.failUnless(c._introducer_error.check(InsufficientVersionError),
988                             c._introducer_error)
989         d.addCallback(_done)
990         return d
991
992 class DecodeFurl(unittest.TestCase):
993     def test_decode(self):
994         # make sure we have a working base64.b32decode. The one in
995         # python2.4.[01] was broken.
996         furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
997         m = re.match(r'pb://(\w+)@', furl)
998         assert m
999         nodeid = b32decode(m.group(1).upper())
1000         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
1001
1002 class Signatures(unittest.TestCase):
1003     def test_sign(self):
1004         ann = {"key1": "value1"}
1005         sk_s,vk_s = keyutil.make_keypair()
1006         sk,ignored = keyutil.parse_privkey(sk_s)
1007         ann_t = sign_to_foolscap(ann, sk)
1008         (msg, sig, key) = ann_t
1009         self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
1010         self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
1011         self.failUnless(sig.startswith("v0-"))
1012         self.failUnless(key.startswith("v0-"))
1013         (ann2,key2) = unsign_from_foolscap(ann_t)
1014         self.failUnlessEqual(ann2, ann)
1015         self.failUnlessEqual("pub-"+key2, vk_s)
1016
1017         # bad signature
1018         bad_ann = {"key1": "value2"}
1019         bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
1020         self.failUnlessRaises(keyutil.BadSignatureError,
1021                               unsign_from_foolscap, (bad_msg,sig,key))
1022         # sneaky bad signature should be ignored
1023         (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
1024         self.failUnlessEqual(key2, None)
1025         self.failUnlessEqual(ann2, bad_ann)
1026
1027         # unrecognized signatures
1028         self.failUnlessRaises(UnknownKeyError,
1029                               unsign_from_foolscap, (bad_msg,"v999-sig",key))
1030         self.failUnlessRaises(UnknownKeyError,
1031                               unsign_from_foolscap, (bad_msg,sig,"v999-key"))
1032
1033
1034 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
1035 # should leave the Reconnector in place, also if it receives
1036 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
1037 # should tear down the Reconnector and make a new one. This behavior used to
1038 # live in the IntroducerClient, and thus used to be tested by test_introducer
1039
1040 # copying more tests from old branch:
1041
1042 #  then also add Upgrade test