]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
6624a15bf4c16ad5bd783aee8fa46e7ef51d44c9
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 import os, re
3 from base64 import b32decode
4 import simplejson
5
6 from twisted.trial import unittest
7 from twisted.internet import defer, address
8 from twisted.python import log
9
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14      WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17      get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
18      UnknownKeyError
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.web import introweb
23 from allmydata.util import pollmixin, keyutil, idlib
24 import allmydata.test.common_util as testutil
25
26 class LoggingMultiService(service.MultiService):
27     def log(self, msg, **kw):
28         log.msg(msg, **kw)
29
30 class Node(testutil.SignalMixin, unittest.TestCase):
31     def test_loadable(self):
32         basedir = "introducer.IntroducerNode.test_loadable"
33         os.mkdir(basedir)
34         q = IntroducerNode(basedir)
35         d = fireEventually(None)
36         d.addCallback(lambda res: q.startService())
37         d.addCallback(lambda res: q.when_tub_ready())
38         d.addCallback(lambda res: q.stopService())
39         d.addCallback(flushEventualQueue)
40         return d
41
42 class ServiceMixin:
43     def setUp(self):
44         self.parent = LoggingMultiService()
45         self.parent.startService()
46     def tearDown(self):
47         log.msg("TestIntroducer.tearDown")
48         d = defer.succeed(None)
49         d.addCallback(lambda res: self.parent.stopService())
50         d.addCallback(flushEventualQueue)
51         return d
52
53 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
54
55     def test_create(self):
56         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
57                               "my_version", "oldest_version", {})
58         self.failUnless(isinstance(ic, IntroducerClient))
59
60     def test_listen(self):
61         i = IntroducerService()
62         i.setServiceParent(self.parent)
63
64     def test_duplicate_publish(self):
65         i = IntroducerService()
66         self.failUnlessEqual(len(i.get_announcements()), 0)
67         self.failUnlessEqual(len(i.get_subscribers()), 0)
68         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
69         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
70         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
71         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
72         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
73         i.remote_publish(ann1)
74         self.failUnlessEqual(len(i.get_announcements()), 1)
75         self.failUnlessEqual(len(i.get_subscribers()), 0)
76         i.remote_publish(ann2)
77         self.failUnlessEqual(len(i.get_announcements()), 2)
78         self.failUnlessEqual(len(i.get_subscribers()), 0)
79         i.remote_publish(ann1b)
80         self.failUnlessEqual(len(i.get_announcements()), 2)
81         self.failUnlessEqual(len(i.get_subscribers()), 0)
82
83     def test_id_collision(self):
84         # test replacement case where tubid equals a keyid (one should
85         # not replace the other)
86         i = IntroducerService()
87         ic = IntroducerClient(None,
88                               "introducer.furl", u"my_nickname",
89                               "my_version", "oldest_version", {})
90         sk_s, vk_s = keyutil.make_keypair()
91         sk, _ignored = keyutil.parse_privkey(sk_s)
92         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
93         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
94         ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
95         i.remote_publish_v2(ann_t, Referenceable())
96         announcements = i.get_announcements()
97         self.failUnlessEqual(len(announcements), 1)
98         key1 = ("storage", "v0-"+keyid, None)
99         self.failUnlessEqual(announcements[0].index, key1)
100         ann1_out = announcements[0].announcement
101         self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
102
103         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
104         ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
105         i.remote_publish(ann2)
106         announcements = i.get_announcements()
107         self.failUnlessEqual(len(announcements), 2)
108         key2 = ("storage", None, keyid)
109         wanted = [ad for ad in announcements if ad.index == key2]
110         self.failUnlessEqual(len(wanted), 1)
111         ann2_out = wanted[0].announcement
112         self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
113
114
115 def make_ann(furl):
116     ann = { "anonymous-storage-FURL": furl,
117             "permutation-seed-base32": get_tubid_string(furl) }
118     return ann
119
120 def make_ann_t(ic, furl, privkey):
121     return ic.create_announcement("storage", make_ann(furl), privkey)
122
123 class Client(unittest.TestCase):
124     def test_duplicate_receive_v1(self):
125         ic = IntroducerClient(None,
126                               "introducer.furl", u"my_nickname",
127                               "my_version", "oldest_version", {})
128         announcements = []
129         ic.subscribe_to("storage",
130                         lambda key_s,ann: announcements.append(ann))
131         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
132         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
133         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
134         ca = WrapV2ClientInV1Interface(ic)
135
136         ca.remote_announce([ann1])
137         d = fireEventually()
138         def _then(ign):
139             self.failUnlessEqual(len(announcements), 1)
140             self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
141             self.failUnlessEqual(announcements[0]["my-version"], "ver23")
142             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
143             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
144             self.failUnlessEqual(ic._debug_counts["update"], 0)
145             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
146             # now send a duplicate announcement: this should not notify clients
147             ca.remote_announce([ann1])
148             return fireEventually()
149         d.addCallback(_then)
150         def _then2(ign):
151             self.failUnlessEqual(len(announcements), 1)
152             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
153             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
154             self.failUnlessEqual(ic._debug_counts["update"], 0)
155             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
156             # and a replacement announcement: same FURL, new other stuff.
157             # Clients should be notified.
158             ca.remote_announce([ann1b])
159             return fireEventually()
160         d.addCallback(_then2)
161         def _then3(ign):
162             self.failUnlessEqual(len(announcements), 2)
163             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
164             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
165             self.failUnlessEqual(ic._debug_counts["update"], 1)
166             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
167             # test that the other stuff changed
168             self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
169             self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
170         d.addCallback(_then3)
171         return d
172
173     def test_duplicate_receive_v2(self):
174         ic1 = IntroducerClient(None,
175                                "introducer.furl", u"my_nickname",
176                                "ver23", "oldest_version", {})
177         # we use a second client just to create a different-looking
178         # announcement
179         ic2 = IntroducerClient(None,
180                                "introducer.furl", u"my_nickname",
181                                "ver24","oldest_version",{})
182         announcements = []
183         def _received(key_s, ann):
184             announcements.append( (key_s, ann) )
185         ic1.subscribe_to("storage", _received)
186         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
187         furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
188         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
189
190         privkey_s, pubkey_vs = keyutil.make_keypair()
191         privkey, _ignored = keyutil.parse_privkey(privkey_s)
192         pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
193
194         # ann1: ic1, furl1
195         # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
196         # ann1b: ic2, furl1
197         # ann2: ic2, furl2
198
199         self.ann1 = make_ann_t(ic1, furl1, privkey)
200         self.ann1a = make_ann_t(ic1, furl1a, privkey)
201         self.ann1b = make_ann_t(ic2, furl1, privkey)
202         self.ann2 = make_ann_t(ic2, furl2, privkey)
203
204         ic1.remote_announce_v2([self.ann1]) # queues eventual-send
205         d = fireEventually()
206         def _then1(ign):
207             self.failUnlessEqual(len(announcements), 1)
208             key_s,ann = announcements[0]
209             self.failUnlessEqual(key_s, pubkey_s)
210             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
211             self.failUnlessEqual(ann["my-version"], "ver23")
212         d.addCallback(_then1)
213
214         # now send a duplicate announcement. This should not fire the
215         # subscriber
216         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
217         d.addCallback(fireEventually)
218         def _then2(ign):
219             self.failUnlessEqual(len(announcements), 1)
220         d.addCallback(_then2)
221
222         # and a replacement announcement: same FURL, new other stuff. The
223         # subscriber *should* be fired.
224         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
225         d.addCallback(fireEventually)
226         def _then3(ign):
227             self.failUnlessEqual(len(announcements), 2)
228             key_s,ann = announcements[-1]
229             self.failUnlessEqual(key_s, pubkey_s)
230             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
231             self.failUnlessEqual(ann["my-version"], "ver24")
232         d.addCallback(_then3)
233
234         # and a replacement announcement with a different FURL (it uses
235         # different connection hints)
236         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
237         d.addCallback(fireEventually)
238         def _then4(ign):
239             self.failUnlessEqual(len(announcements), 3)
240             key_s,ann = announcements[-1]
241             self.failUnlessEqual(key_s, pubkey_s)
242             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
243             self.failUnlessEqual(ann["my-version"], "ver23")
244         d.addCallback(_then4)
245
246         # now add a new subscription, which should be called with the
247         # backlog. The introducer only records one announcement per index, so
248         # the backlog will only have the latest message.
249         announcements2 = []
250         def _received2(key_s, ann):
251             announcements2.append( (key_s, ann) )
252         d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
253         d.addCallback(fireEventually)
254         def _then5(ign):
255             self.failUnlessEqual(len(announcements2), 1)
256             key_s,ann = announcements2[-1]
257             self.failUnlessEqual(key_s, pubkey_s)
258             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
259             self.failUnlessEqual(ann["my-version"], "ver23")
260         d.addCallback(_then5)
261         return d
262
263     def test_id_collision(self):
264         # test replacement case where tubid equals a keyid (one should
265         # not replace the other)
266         ic = IntroducerClient(None,
267                               "introducer.furl", u"my_nickname",
268                               "my_version", "oldest_version", {})
269         announcements = []
270         ic.subscribe_to("storage",
271                         lambda key_s,ann: announcements.append(ann))
272         sk_s, vk_s = keyutil.make_keypair()
273         sk, _ignored = keyutil.parse_privkey(sk_s)
274         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
275         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
276         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
277         ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
278         ic.remote_announce_v2([ann_t])
279         d = fireEventually()
280         def _then(ign):
281             # first announcement has been processed
282             self.failUnlessEqual(len(announcements), 1)
283             self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
284                                  furl1)
285             # now submit a second one, with a tubid that happens to look just
286             # like the pubkey-based serverid we just processed. They should
287             # not overlap.
288             ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
289             ca = WrapV2ClientInV1Interface(ic)
290             ca.remote_announce([ann2])
291             return fireEventually()
292         d.addCallback(_then)
293         def _then2(ign):
294             # if they overlapped, the second announcement would be ignored
295             self.failUnlessEqual(len(announcements), 2)
296             self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
297                                  furl2)
298         d.addCallback(_then2)
299         return d
300
301 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
302
303 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
304
305     def create_tub(self, portnum=0):
306         tubfile = os.path.join(self.basedir, "tub.pem")
307         self.central_tub = tub = Tub(certFile=tubfile)
308         #tub.setOption("logLocalFailures", True)
309         #tub.setOption("logRemoteFailures", True)
310         tub.setOption("expose-remote-exception-types", False)
311         tub.setServiceParent(self.parent)
312         l = tub.listenOn("tcp:%d" % portnum)
313         self.central_portnum = l.getPortnum()
314         if portnum != 0:
315             assert self.central_portnum == portnum
316         tub.setLocation("localhost:%d" % self.central_portnum)
317
318 class Queue(SystemTestMixin, unittest.TestCase):
319     def test_queue_until_connected(self):
320         self.basedir = "introducer/QueueUntilConnected/queued"
321         os.makedirs(self.basedir)
322         self.create_tub()
323         introducer = IntroducerService()
324         introducer.setServiceParent(self.parent)
325         iff = os.path.join(self.basedir, "introducer.furl")
326         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
327         tub2 = Tub()
328         tub2.setServiceParent(self.parent)
329         c = IntroducerClient(tub2, ifurl,
330                              u"nickname", "version", "oldest", {})
331         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
332         sk_s, vk_s = keyutil.make_keypair()
333         sk, _ignored = keyutil.parse_privkey(sk_s)
334
335         d = introducer.disownServiceParent()
336         def _offline(ign):
337             # now that the introducer server is offline, create a client and
338             # publish some messages
339             c.setServiceParent(self.parent) # this starts the reconnector
340             c.publish("storage", make_ann(furl1), sk)
341
342             introducer.setServiceParent(self.parent) # restart the server
343             # now wait for the messages to be delivered
344             def _got_announcement():
345                 return bool(introducer.get_announcements())
346             return self.poll(_got_announcement)
347         d.addCallback(_offline)
348         def _done(ign):
349             v = introducer.get_announcements()[0]
350             furl = v.announcement["anonymous-storage-FURL"]
351             self.failUnlessEqual(furl, furl1)
352         d.addCallback(_done)
353
354         # now let the ack get back
355         def _wait_until_idle(ign):
356             def _idle():
357                 if c._debug_outstanding:
358                     return False
359                 if introducer._debug_outstanding:
360                     return False
361                 return True
362             return self.poll(_idle)
363         d.addCallback(_wait_until_idle)
364         return d
365
366
367 V1 = "v1"; V2 = "v2"
368 class SystemTest(SystemTestMixin, unittest.TestCase):
369
370     def do_system_test(self, server_version):
371         self.create_tub()
372         if server_version == V1:
373             introducer = old.IntroducerService_v1()
374         else:
375             introducer = IntroducerService()
376         introducer.setServiceParent(self.parent)
377         iff = os.path.join(self.basedir, "introducer.furl")
378         tub = self.central_tub
379         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
380         self.introducer_furl = ifurl
381
382         # we have 5 clients who publish themselves as storage servers, and a
383         # sixth which does which not. All 6 clients subscriber to hear about
384         # storage. When the connections are fully established, all six nodes
385         # should have 5 connections each.
386         NUM_STORAGE = 5
387         NUM_CLIENTS = 6
388
389         clients = []
390         tubs = {}
391         received_announcements = {}
392         subscribing_clients = []
393         publishing_clients = []
394         printable_serverids = {}
395         self.the_introducer = introducer
396         privkeys = {}
397         expected_announcements = [0 for c in range(NUM_CLIENTS)]
398
399         for i in range(NUM_CLIENTS):
400             tub = Tub()
401             #tub.setOption("logLocalFailures", True)
402             #tub.setOption("logRemoteFailures", True)
403             tub.setOption("expose-remote-exception-types", False)
404             tub.setServiceParent(self.parent)
405             l = tub.listenOn("tcp:0")
406             portnum = l.getPortnum()
407             tub.setLocation("localhost:%d" % portnum)
408
409             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
410             if i == 0:
411                 c = old.IntroducerClient_v1(tub, self.introducer_furl,
412                                             NICKNAME % str(i),
413                                             "version", "oldest")
414             else:
415                 c = IntroducerClient(tub, self.introducer_furl,
416                                      NICKNAME % str(i),
417                                      "version", "oldest",
418                                      {"component": "component-v1"})
419             received_announcements[c] = {}
420             def got(key_s_or_tubid, ann, announcements, i):
421                 if i == 0:
422                     index = get_tubid_string_from_ann(ann)
423                 else:
424                     index = key_s_or_tubid or get_tubid_string_from_ann(ann)
425                 announcements[index] = ann
426             c.subscribe_to("storage", got, received_announcements[c], i)
427             subscribing_clients.append(c)
428             expected_announcements[i] += 1 # all expect a 'storage' announcement
429
430             node_furl = tub.registerReference(Referenceable())
431             if i < NUM_STORAGE:
432                 if i == 0:
433                     c.publish(node_furl, "storage", "ri_name")
434                     printable_serverids[i] = get_tubid_string(node_furl)
435                 elif i == 1:
436                     # sign the announcement
437                     privkey_s, pubkey_s = keyutil.make_keypair()
438                     privkey, _ignored = keyutil.parse_privkey(privkey_s)
439                     privkeys[c] = privkey
440                     c.publish("storage", make_ann(node_furl), privkey)
441                     if server_version == V1:
442                         printable_serverids[i] = get_tubid_string(node_furl)
443                     else:
444                         assert pubkey_s.startswith("pub-")
445                         printable_serverids[i] = pubkey_s[len("pub-"):]
446                 else:
447                     c.publish("storage", make_ann(node_furl))
448                     printable_serverids[i] = get_tubid_string(node_furl)
449                 publishing_clients.append(c)
450             else:
451                 # the last one does not publish anything
452                 pass
453
454             if i == 0:
455                 # users of the V1 client were required to publish a
456                 # 'stub_client' record (somewhat after they published the
457                 # 'storage' record), so the introducer could see their
458                 # version. Match that behavior.
459                 c.publish(node_furl, "stub_client", "stub_ri_name")
460
461             if i == 2:
462                 # also publish something that nobody cares about
463                 boring_furl = tub.registerReference(Referenceable())
464                 c.publish("boring", make_ann(boring_furl))
465
466             c.setServiceParent(self.parent)
467             clients.append(c)
468             tubs[c] = tub
469
470
471         def _wait_for_connected(ign):
472             def _connected():
473                 for c in clients:
474                     if not c.connected_to_introducer():
475                         return False
476                 return True
477             return self.poll(_connected)
478
479         # we watch the clients to determine when the system has settled down.
480         # Then we can look inside the server to assert things about its
481         # state.
482
483         def _wait_for_expected_announcements(ign):
484             def _got_expected_announcements():
485                 for i,c in enumerate(subscribing_clients):
486                     if len(received_announcements[c]) < expected_announcements[i]:
487                         return False
488                 return True
489             return self.poll(_got_expected_announcements)
490
491         # before shutting down any Tub, we'd like to know that there are no
492         # messages outstanding
493
494         def _wait_until_idle(ign):
495             def _idle():
496                 for c in subscribing_clients + publishing_clients:
497                     if c._debug_outstanding:
498                         return False
499                 if self.the_introducer._debug_outstanding:
500                     return False
501                 return True
502             return self.poll(_idle)
503
504         d = defer.succeed(None)
505         d.addCallback(_wait_for_connected)
506         d.addCallback(_wait_for_expected_announcements)
507         d.addCallback(_wait_until_idle)
508
509         def _check1(res):
510             log.msg("doing _check1")
511             dc = self.the_introducer._debug_counts
512             if server_version == V1:
513                 # each storage server publishes a record, and (after its
514                 # 'subscribe' has been ACKed) also publishes a "stub_client".
515                 # The non-storage client (which subscribes) also publishes a
516                 # stub_client. There is also one "boring" service. The number
517                 # of messages is higher, because the stub_clients aren't
518                 # published until after we get the 'subscribe' ack (since we
519                 # don't realize that we're dealing with a v1 server [which
520                 # needs stub_clients] until then), and the act of publishing
521                 # the stub_client causes us to re-send all previous
522                 # announcements.
523                 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
524                                      NUM_STORAGE + NUM_CLIENTS + 1)
525             else:
526                 # each storage server publishes a record. There is also one
527                 # "stub_client" and one "boring"
528                 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
529                 self.failUnlessEqual(dc["inbound_duplicate"], 0)
530             self.failUnlessEqual(dc["inbound_update"], 0)
531             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
532             # the number of outbound messages is tricky.. I think it depends
533             # upon a race between the publish and the subscribe messages.
534             self.failUnless(dc["outbound_message"] > 0)
535             # each client subscribes to "storage", and each server publishes
536             self.failUnlessEqual(dc["outbound_announcements"],
537                                  NUM_STORAGE*NUM_CLIENTS)
538
539             for c in subscribing_clients:
540                 cdc = c._debug_counts
541                 self.failUnless(cdc["inbound_message"])
542                 self.failUnlessEqual(cdc["inbound_announcement"],
543                                      NUM_STORAGE)
544                 self.failUnlessEqual(cdc["wrong_service"], 0)
545                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
546                 self.failUnlessEqual(cdc["update"], 0)
547                 self.failUnlessEqual(cdc["new_announcement"],
548                                      NUM_STORAGE)
549                 anns = received_announcements[c]
550                 self.failUnlessEqual(len(anns), NUM_STORAGE)
551
552                 nodeid0 = tubs[clients[0]].tubID
553                 ann = anns[nodeid0]
554                 nick = ann["nickname"]
555                 self.failUnlessEqual(type(nick), unicode)
556                 self.failUnlessEqual(nick, NICKNAME % "0")
557             if server_version == V1:
558                 for c in publishing_clients:
559                     cdc = c._debug_counts
560                     expected = 1 # storage
561                     if c is clients[2]:
562                         expected += 1 # boring
563                     if c is not clients[0]:
564                         # the v2 client tries to call publish_v2, which fails
565                         # because the server is v1. It then re-sends
566                         # everything it has so far, plus a stub_client record
567                         expected = 2*expected + 1
568                     if c is clients[0]:
569                         # we always tell v1 client to send stub_client
570                         expected += 1
571                     self.failUnlessEqual(cdc["outbound_message"], expected)
572             else:
573                 for c in publishing_clients:
574                     cdc = c._debug_counts
575                     expected = 1
576                     if c in [clients[0], # stub_client
577                              clients[2], # boring
578                              ]:
579                         expected = 2
580                     self.failUnlessEqual(cdc["outbound_message"], expected)
581             # now check the web status, make sure it renders without error
582             ir = introweb.IntroducerRoot(self.parent)
583             self.parent.nodeid = "NODEID"
584             text = ir.renderSynchronously().decode("utf-8")
585             self.failUnlessIn(NICKNAME % "0", text) # the v1 client
586             self.failUnlessIn(NICKNAME % "1", text) # a v2 client
587             for i in range(NUM_STORAGE):
588                 self.failUnlessIn(printable_serverids[i], text,
589                                   (i,printable_serverids[i],text))
590                 # make sure there isn't a double-base32ed string too
591                 self.failIfIn(idlib.nodeid_b2a(printable_serverids[i]), text,
592                               (i,printable_serverids[i],text))
593             log.msg("_check1 done")
594         d.addCallback(_check1)
595
596         # force an introducer reconnect, by shutting down the Tub it's using
597         # and starting a new Tub (with the old introducer). Everybody should
598         # reconnect and republish, but the introducer should ignore the
599         # republishes as duplicates. However, because the server doesn't know
600         # what each client does and does not know, it will send them a copy
601         # of the current announcement table anyway.
602
603         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
604         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
605
606         def _wait_for_introducer_loss(ign):
607             def _introducer_lost():
608                 for c in clients:
609                     if c.connected_to_introducer():
610                         return False
611                 return True
612             return self.poll(_introducer_lost)
613         d.addCallback(_wait_for_introducer_loss)
614
615         def _restart_introducer_tub(_ign):
616             log.msg("restarting introducer's Tub")
617             # reset counters
618             for i in range(NUM_CLIENTS):
619                 c = subscribing_clients[i]
620                 for k in c._debug_counts:
621                     c._debug_counts[k] = 0
622             for k in self.the_introducer._debug_counts:
623                 self.the_introducer._debug_counts[k] = 0
624             expected_announcements[i] += 1 # new 'storage' for everyone
625             self.create_tub(self.central_portnum)
626             newfurl = self.central_tub.registerReference(self.the_introducer,
627                                                          furlFile=iff)
628             assert newfurl == self.introducer_furl
629         d.addCallback(_restart_introducer_tub)
630
631         d.addCallback(_wait_for_connected)
632         d.addCallback(_wait_for_expected_announcements)
633         d.addCallback(_wait_until_idle)
634         d.addCallback(lambda _ign: log.msg(" reconnected"))
635
636         # TODO: publish something while the introducer is offline, then
637         # confirm it gets delivered when the connection is reestablished
638         def _check2(res):
639             log.msg("doing _check2")
640             # assert that the introducer sent out new messages, one per
641             # subscriber
642             dc = self.the_introducer._debug_counts
643             self.failUnlessEqual(dc["outbound_announcements"],
644                                  NUM_STORAGE*NUM_CLIENTS)
645             self.failUnless(dc["outbound_message"] > 0)
646             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
647             for c in subscribing_clients:
648                 cdc = c._debug_counts
649                 self.failUnlessEqual(cdc["inbound_message"], 1)
650                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
651                 self.failUnlessEqual(cdc["new_announcement"], 0)
652                 self.failUnlessEqual(cdc["wrong_service"], 0)
653                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
654         d.addCallback(_check2)
655
656         # Then force an introducer restart, by shutting down the Tub,
657         # destroying the old introducer, and starting a new Tub+Introducer.
658         # Everybody should reconnect and republish, and the (new) introducer
659         # will distribute the new announcements, but the clients should
660         # ignore the republishes as duplicates.
661
662         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
663         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
664         d.addCallback(_wait_for_introducer_loss)
665         d.addCallback(lambda _ign: log.msg("introducer lost"))
666
667         def _restart_introducer(_ign):
668             log.msg("restarting introducer")
669             self.create_tub(self.central_portnum)
670             # reset counters
671             for i in range(NUM_CLIENTS):
672                 c = subscribing_clients[i]
673                 for k in c._debug_counts:
674                     c._debug_counts[k] = 0
675             expected_announcements[i] += 1 # new 'storage' for everyone
676             if server_version == V1:
677                 introducer = old.IntroducerService_v1()
678             else:
679                 introducer = IntroducerService()
680             self.the_introducer = introducer
681             newfurl = self.central_tub.registerReference(self.the_introducer,
682                                                          furlFile=iff)
683             assert newfurl == self.introducer_furl
684         d.addCallback(_restart_introducer)
685
686         d.addCallback(_wait_for_connected)
687         d.addCallback(_wait_for_expected_announcements)
688         d.addCallback(_wait_until_idle)
689
690         def _check3(res):
691             log.msg("doing _check3")
692             dc = self.the_introducer._debug_counts
693             self.failUnlessEqual(dc["outbound_announcements"],
694                                  NUM_STORAGE*NUM_CLIENTS)
695             self.failUnless(dc["outbound_message"] > 0)
696             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
697             for c in subscribing_clients:
698                 cdc = c._debug_counts
699                 self.failUnless(cdc["inbound_message"] > 0)
700                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
701                 self.failUnlessEqual(cdc["new_announcement"], 0)
702                 self.failUnlessEqual(cdc["wrong_service"], 0)
703                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
704
705         d.addCallback(_check3)
706         return d
707
708
709     def test_system_v2_server(self):
710         self.basedir = "introducer/SystemTest/system_v2_server"
711         os.makedirs(self.basedir)
712         return self.do_system_test(V2)
713     test_system_v2_server.timeout = 480
714     # occasionally takes longer than 350s on "draco"
715
716     def test_system_v1_server(self):
717         self.basedir = "introducer/SystemTest/system_v1_server"
718         os.makedirs(self.basedir)
719         return self.do_system_test(V1)
720     test_system_v1_server.timeout = 480
721     # occasionally takes longer than 350s on "draco"
722
723 class FakeRemoteReference:
724     def notifyOnDisconnect(self, *args, **kwargs): pass
725     def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
726     def getLocationHints(self): return [("ipv4", "here.example.com", "1234"),
727                                         ("ipv4", "there.example.com", "2345")]
728     def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
729                                                   3456)
730
731 class ClientInfo(unittest.TestCase):
732     def test_client_v2(self):
733         introducer = IntroducerService()
734         tub = introducer_furl = None
735         app_versions = {"whizzy": "fizzy"}
736         client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
737                                      "my_version", "oldest", app_versions)
738         #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
739         #ann_s = make_ann_t(client_v2, furl1, None)
740         #introducer.remote_publish_v2(ann_s, Referenceable())
741         subscriber = FakeRemoteReference()
742         introducer.remote_subscribe_v2(subscriber, "storage",
743                                        client_v2._my_subscriber_info)
744         subs = introducer.get_subscribers()
745         self.failUnlessEqual(len(subs), 1)
746         s0 = subs[0]
747         self.failUnlessEqual(s0.service_name, "storage")
748         self.failUnlessEqual(s0.app_versions, app_versions)
749         self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
750         self.failUnlessEqual(s0.version, "my_version")
751
752     def test_client_v1(self):
753         introducer = IntroducerService()
754         subscriber = FakeRemoteReference()
755         introducer.remote_subscribe(subscriber, "storage")
756         # the v1 subscribe interface had no subscriber_info: that was usually
757         # sent in a separate stub_client pseudo-announcement
758         subs = introducer.get_subscribers()
759         self.failUnlessEqual(len(subs), 1)
760         s0 = subs[0]
761         self.failUnlessEqual(s0.nickname, u"?") # not known yet
762         self.failUnlessEqual(s0.service_name, "storage")
763
764         # now submit the stub_client announcement
765         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
766         ann = (furl1, "stub_client", "RIStubClient",
767                (NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
768         introducer.remote_publish(ann)
769         # the server should correlate the two
770         subs = introducer.get_subscribers()
771         self.failUnlessEqual(len(subs), 1)
772         s0 = subs[0]
773         self.failUnlessEqual(s0.service_name, "storage")
774         # v1 announcements do not contain app-versions
775         self.failUnlessEqual(s0.app_versions, {})
776         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
777         self.failUnlessEqual(s0.version, "my_version")
778
779         # a subscription that arrives after the stub_client announcement
780         # should be correlated too
781         subscriber2 = FakeRemoteReference()
782         introducer.remote_subscribe(subscriber2, "thing2")
783
784         subs = introducer.get_subscribers()
785         self.failUnlessEqual(len(subs), 2)
786         s0 = [s for s in subs if s.service_name == "thing2"][0]
787         # v1 announcements do not contain app-versions
788         self.failUnlessEqual(s0.app_versions, {})
789         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
790         self.failUnlessEqual(s0.version, "my_version")
791
792 class Announcements(unittest.TestCase):
793     def test_client_v2_unsigned(self):
794         introducer = IntroducerService()
795         tub = introducer_furl = None
796         app_versions = {"whizzy": "fizzy"}
797         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
798                                      "my_version", "oldest", app_versions)
799         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
800         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
801         ann_s0 = make_ann_t(client_v2, furl1, None)
802         canary0 = Referenceable()
803         introducer.remote_publish_v2(ann_s0, canary0)
804         a = introducer.get_announcements()
805         self.failUnlessEqual(len(a), 1)
806         self.failUnlessIdentical(a[0].canary, canary0)
807         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
808         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
809         self.failUnlessEqual(a[0].nickname, u"nick-v2")
810         self.failUnlessEqual(a[0].service_name, "storage")
811         self.failUnlessEqual(a[0].version, "my_version")
812         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
813
814     def test_client_v2_signed(self):
815         introducer = IntroducerService()
816         tub = introducer_furl = None
817         app_versions = {"whizzy": "fizzy"}
818         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
819                                      "my_version", "oldest", app_versions)
820         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
821         sk_s, vk_s = keyutil.make_keypair()
822         sk, _ignored = keyutil.parse_privkey(sk_s)
823         pks = keyutil.remove_prefix(vk_s, "pub-")
824         ann_t0 = make_ann_t(client_v2, furl1, sk)
825         canary0 = Referenceable()
826         introducer.remote_publish_v2(ann_t0, canary0)
827         a = introducer.get_announcements()
828         self.failUnlessEqual(len(a), 1)
829         self.failUnlessIdentical(a[0].canary, canary0)
830         self.failUnlessEqual(a[0].index, ("storage", pks, None))
831         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
832         self.failUnlessEqual(a[0].nickname, u"nick-v2")
833         self.failUnlessEqual(a[0].service_name, "storage")
834         self.failUnlessEqual(a[0].version, "my_version")
835         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
836
837     def test_client_v1(self):
838         introducer = IntroducerService()
839
840         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
841         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
842         ann = (furl1, "storage", "RIStorage",
843                u"nick-v1".encode("utf-8"), "my_version", "oldest")
844         introducer.remote_publish(ann)
845
846         a = introducer.get_announcements()
847         self.failUnlessEqual(len(a), 1)
848         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
849         self.failUnlessEqual(a[0].canary, None)
850         self.failUnlessEqual(a[0].announcement["app-versions"], {})
851         self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
852         self.failUnlessEqual(a[0].service_name, "storage")
853         self.failUnlessEqual(a[0].version, "my_version")
854         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
855
856
857 class TooNewServer(IntroducerService):
858     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
859                  { },
860                 "application-version": "greetings from the crazy future",
861                 }
862
863 class NonV1Server(SystemTestMixin, unittest.TestCase):
864     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
865     # protocol, it is supposed to provide a useful error instead of a weird
866     # exception.
867
868     def test_failure(self):
869         self.basedir = "introducer/NonV1Server/failure"
870         os.makedirs(self.basedir)
871         self.create_tub()
872         i = TooNewServer()
873         i.setServiceParent(self.parent)
874         self.introducer_furl = self.central_tub.registerReference(i)
875
876         tub = Tub()
877         tub.setOption("expose-remote-exception-types", False)
878         tub.setServiceParent(self.parent)
879         l = tub.listenOn("tcp:0")
880         portnum = l.getPortnum()
881         tub.setLocation("localhost:%d" % portnum)
882
883         c = IntroducerClient(tub, self.introducer_furl,
884                              u"nickname-client", "version", "oldest", {})
885         announcements = {}
886         def got(key_s, ann):
887             announcements[key_s] = ann
888         c.subscribe_to("storage", got)
889
890         c.setServiceParent(self.parent)
891
892         # now we wait for it to connect and notice the bad version
893
894         def _got_bad():
895             return bool(c._introducer_error) or bool(c._publisher)
896         d = self.poll(_got_bad)
897         def _done(res):
898             self.failUnless(c._introducer_error)
899             self.failUnless(c._introducer_error.check(InsufficientVersionError),
900                             c._introducer_error)
901         d.addCallback(_done)
902         return d
903
904 class DecodeFurl(unittest.TestCase):
905     def test_decode(self):
906         # make sure we have a working base64.b32decode. The one in
907         # python2.4.[01] was broken.
908         furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
909         m = re.match(r'pb://(\w+)@', furl)
910         assert m
911         nodeid = b32decode(m.group(1).upper())
912         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
913
914 class Signatures(unittest.TestCase):
915     def test_sign(self):
916         ann = {"key1": "value1"}
917         sk_s,vk_s = keyutil.make_keypair()
918         sk,ignored = keyutil.parse_privkey(sk_s)
919         ann_t = sign_to_foolscap(ann, sk)
920         (msg, sig, key) = ann_t
921         self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
922         self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
923         self.failUnless(sig.startswith("v0-"))
924         self.failUnless(key.startswith("v0-"))
925         (ann2,key2) = unsign_from_foolscap(ann_t)
926         self.failUnlessEqual(ann2, ann)
927         self.failUnlessEqual("pub-"+key2, vk_s)
928
929         # bad signature
930         bad_ann = {"key1": "value2"}
931         bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
932         self.failUnlessRaises(keyutil.BadSignatureError,
933                               unsign_from_foolscap, (bad_msg,sig,key))
934         # sneaky bad signature should be ignored
935         (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
936         self.failUnlessEqual(key2, None)
937         self.failUnlessEqual(ann2, bad_ann)
938
939         # unrecognized signatures
940         self.failUnlessRaises(UnknownKeyError,
941                               unsign_from_foolscap, (bad_msg,"v999-sig",key))
942         self.failUnlessRaises(UnknownKeyError,
943                               unsign_from_foolscap, (bad_msg,sig,"v999-key"))
944
945
946 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
947 # should leave the Reconnector in place, also if it receives
948 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
949 # should tear down the Reconnector and make a new one. This behavior used to
950 # live in the IntroducerClient, and thus used to be tested by test_introducer
951
952 # copying more tests from old branch:
953
954 #  then also add Upgrade test