]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
1c61ad71a2eda8e93ad3ae3b49574bb30c2a0a99
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 import os, re
3 from base64 import b32decode
4 import simplejson
5
6 from twisted.trial import unittest
7 from twisted.internet import defer, address
8 from twisted.python import log
9
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14      WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17      get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
18      UnknownKeyError
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.web import introweb
23 from allmydata.util import pollmixin, keyutil
24 import allmydata.test.common_util as testutil
25
26 class LoggingMultiService(service.MultiService):
27     def log(self, msg, **kw):
28         log.msg(msg, **kw)
29
30 class Node(testutil.SignalMixin, unittest.TestCase):
31     def test_loadable(self):
32         basedir = "introducer.IntroducerNode.test_loadable"
33         os.mkdir(basedir)
34         q = IntroducerNode(basedir)
35         d = fireEventually(None)
36         d.addCallback(lambda res: q.startService())
37         d.addCallback(lambda res: q.when_tub_ready())
38         d.addCallback(lambda res: q.stopService())
39         d.addCallback(flushEventualQueue)
40         return d
41
42 class ServiceMixin:
43     def setUp(self):
44         self.parent = LoggingMultiService()
45         self.parent.startService()
46     def tearDown(self):
47         log.msg("TestIntroducer.tearDown")
48         d = defer.succeed(None)
49         d.addCallback(lambda res: self.parent.stopService())
50         d.addCallback(flushEventualQueue)
51         return d
52
53 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
54
55     def test_create(self):
56         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
57                               "my_version", "oldest_version", {})
58         self.failUnless(isinstance(ic, IntroducerClient))
59
60     def test_listen(self):
61         i = IntroducerService()
62         i.setServiceParent(self.parent)
63
64     def test_duplicate_publish(self):
65         i = IntroducerService()
66         self.failUnlessEqual(len(i.get_announcements()), 0)
67         self.failUnlessEqual(len(i.get_subscribers()), 0)
68         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
69         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
70         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
71         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
72         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
73         i.remote_publish(ann1)
74         self.failUnlessEqual(len(i.get_announcements()), 1)
75         self.failUnlessEqual(len(i.get_subscribers()), 0)
76         i.remote_publish(ann2)
77         self.failUnlessEqual(len(i.get_announcements()), 2)
78         self.failUnlessEqual(len(i.get_subscribers()), 0)
79         i.remote_publish(ann1b)
80         self.failUnlessEqual(len(i.get_announcements()), 2)
81         self.failUnlessEqual(len(i.get_subscribers()), 0)
82
83     def test_id_collision(self):
84         # test replacement case where tubid equals a keyid (one should
85         # not replace the other)
86         i = IntroducerService()
87         ic = IntroducerClient(None,
88                               "introducer.furl", u"my_nickname",
89                               "my_version", "oldest_version", {})
90         sk_s, vk_s = keyutil.make_keypair()
91         sk, _ignored = keyutil.parse_privkey(sk_s)
92         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
93         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
94         ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
95         i.remote_publish_v2(ann_t, Referenceable())
96         announcements = i.get_announcements()
97         self.failUnlessEqual(len(announcements), 1)
98         key1 = ("storage", "v0-"+keyid, None)
99         self.failUnlessEqual(announcements[0].index, key1)
100         ann1_out = announcements[0].announcement
101         self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
102
103         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
104         ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
105         i.remote_publish(ann2)
106         announcements = i.get_announcements()
107         self.failUnlessEqual(len(announcements), 2)
108         key2 = ("storage", None, keyid)
109         wanted = [ad for ad in announcements if ad.index == key2]
110         self.failUnlessEqual(len(wanted), 1)
111         ann2_out = wanted[0].announcement
112         self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
113
114
115 def make_ann(furl):
116     ann = { "anonymous-storage-FURL": furl,
117             "permutation-seed-base32": get_tubid_string(furl) }
118     return ann
119
120 def make_ann_t(ic, furl, privkey):
121     return ic.create_announcement("storage", make_ann(furl), privkey)
122
123 class Client(unittest.TestCase):
124     def test_duplicate_receive_v1(self):
125         ic = IntroducerClient(None,
126                               "introducer.furl", u"my_nickname",
127                               "my_version", "oldest_version", {})
128         announcements = []
129         ic.subscribe_to("storage",
130                         lambda key_s,ann: announcements.append(ann))
131         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
132         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
133         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
134         ca = WrapV2ClientInV1Interface(ic)
135
136         ca.remote_announce([ann1])
137         d = fireEventually()
138         def _then(ign):
139             self.failUnlessEqual(len(announcements), 1)
140             self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
141             self.failUnlessEqual(announcements[0]["my-version"], "ver23")
142             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
143             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
144             self.failUnlessEqual(ic._debug_counts["update"], 0)
145             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
146             # now send a duplicate announcement: this should not notify clients
147             ca.remote_announce([ann1])
148             return fireEventually()
149         d.addCallback(_then)
150         def _then2(ign):
151             self.failUnlessEqual(len(announcements), 1)
152             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
153             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
154             self.failUnlessEqual(ic._debug_counts["update"], 0)
155             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
156             # and a replacement announcement: same FURL, new other stuff.
157             # Clients should be notified.
158             ca.remote_announce([ann1b])
159             return fireEventually()
160         d.addCallback(_then2)
161         def _then3(ign):
162             self.failUnlessEqual(len(announcements), 2)
163             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
164             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
165             self.failUnlessEqual(ic._debug_counts["update"], 1)
166             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
167             # test that the other stuff changed
168             self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
169             self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
170         d.addCallback(_then3)
171         return d
172
173     def test_duplicate_receive_v2(self):
174         ic1 = IntroducerClient(None,
175                                "introducer.furl", u"my_nickname",
176                                "ver23", "oldest_version", {})
177         # we use a second client just to create a different-looking
178         # announcement
179         ic2 = IntroducerClient(None,
180                                "introducer.furl", u"my_nickname",
181                                "ver24","oldest_version",{})
182         announcements = []
183         def _received(key_s, ann):
184             announcements.append( (key_s, ann) )
185         ic1.subscribe_to("storage", _received)
186         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
187         furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
188         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
189
190         privkey_s, pubkey_vs = keyutil.make_keypair()
191         privkey, _ignored = keyutil.parse_privkey(privkey_s)
192         pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
193
194         # ann1: ic1, furl1
195         # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
196         # ann1b: ic2, furl1
197         # ann2: ic2, furl2
198
199         self.ann1 = make_ann_t(ic1, furl1, privkey)
200         self.ann1a = make_ann_t(ic1, furl1a, privkey)
201         self.ann1b = make_ann_t(ic2, furl1, privkey)
202         self.ann2 = make_ann_t(ic2, furl2, privkey)
203
204         ic1.remote_announce_v2([self.ann1]) # queues eventual-send
205         d = fireEventually()
206         def _then1(ign):
207             self.failUnlessEqual(len(announcements), 1)
208             key_s,ann = announcements[0]
209             self.failUnlessEqual(key_s, pubkey_s)
210             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
211             self.failUnlessEqual(ann["my-version"], "ver23")
212         d.addCallback(_then1)
213
214         # now send a duplicate announcement. This should not fire the
215         # subscriber
216         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
217         d.addCallback(fireEventually)
218         def _then2(ign):
219             self.failUnlessEqual(len(announcements), 1)
220         d.addCallback(_then2)
221
222         # and a replacement announcement: same FURL, new other stuff. The
223         # subscriber *should* be fired.
224         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
225         d.addCallback(fireEventually)
226         def _then3(ign):
227             self.failUnlessEqual(len(announcements), 2)
228             key_s,ann = announcements[-1]
229             self.failUnlessEqual(key_s, pubkey_s)
230             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
231             self.failUnlessEqual(ann["my-version"], "ver24")
232         d.addCallback(_then3)
233
234         # and a replacement announcement with a different FURL (it uses
235         # different connection hints)
236         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
237         d.addCallback(fireEventually)
238         def _then4(ign):
239             self.failUnlessEqual(len(announcements), 3)
240             key_s,ann = announcements[-1]
241             self.failUnlessEqual(key_s, pubkey_s)
242             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
243             self.failUnlessEqual(ann["my-version"], "ver23")
244         d.addCallback(_then4)
245
246         # now add a new subscription, which should be called with the
247         # backlog. The introducer only records one announcement per index, so
248         # the backlog will only have the latest message.
249         announcements2 = []
250         def _received2(key_s, ann):
251             announcements2.append( (key_s, ann) )
252         d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
253         d.addCallback(fireEventually)
254         def _then5(ign):
255             self.failUnlessEqual(len(announcements2), 1)
256             key_s,ann = announcements2[-1]
257             self.failUnlessEqual(key_s, pubkey_s)
258             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
259             self.failUnlessEqual(ann["my-version"], "ver23")
260         d.addCallback(_then5)
261         return d
262
263     def test_id_collision(self):
264         # test replacement case where tubid equals a keyid (one should
265         # not replace the other)
266         ic = IntroducerClient(None,
267                               "introducer.furl", u"my_nickname",
268                               "my_version", "oldest_version", {})
269         announcements = []
270         ic.subscribe_to("storage",
271                         lambda key_s,ann: announcements.append(ann))
272         sk_s, vk_s = keyutil.make_keypair()
273         sk, _ignored = keyutil.parse_privkey(sk_s)
274         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
275         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
276         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
277         ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
278         ic.remote_announce_v2([ann_t])
279         d = fireEventually()
280         def _then(ign):
281             # first announcement has been processed
282             self.failUnlessEqual(len(announcements), 1)
283             self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
284                                  furl1)
285             # now submit a second one, with a tubid that happens to look just
286             # like the pubkey-based serverid we just processed. They should
287             # not overlap.
288             ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
289             ca = WrapV2ClientInV1Interface(ic)
290             ca.remote_announce([ann2])
291             return fireEventually()
292         d.addCallback(_then)
293         def _then2(ign):
294             # if they overlapped, the second announcement would be ignored
295             self.failUnlessEqual(len(announcements), 2)
296             self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
297                                  furl2)
298         d.addCallback(_then2)
299         return d
300
301 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
302
303 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
304
305     def create_tub(self, portnum=0):
306         tubfile = os.path.join(self.basedir, "tub.pem")
307         self.central_tub = tub = Tub(certFile=tubfile)
308         #tub.setOption("logLocalFailures", True)
309         #tub.setOption("logRemoteFailures", True)
310         tub.setOption("expose-remote-exception-types", False)
311         tub.setServiceParent(self.parent)
312         l = tub.listenOn("tcp:%d" % portnum)
313         self.central_portnum = l.getPortnum()
314         if portnum != 0:
315             assert self.central_portnum == portnum
316         tub.setLocation("localhost:%d" % self.central_portnum)
317
318 class Queue(SystemTestMixin, unittest.TestCase):
319     def test_queue_until_connected(self):
320         self.basedir = "introducer/QueueUntilConnected/queued"
321         os.makedirs(self.basedir)
322         self.create_tub()
323         introducer = IntroducerService()
324         introducer.setServiceParent(self.parent)
325         iff = os.path.join(self.basedir, "introducer.furl")
326         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
327         tub2 = Tub()
328         tub2.setServiceParent(self.parent)
329         c = IntroducerClient(tub2, ifurl,
330                              u"nickname", "version", "oldest", {})
331         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
332         sk_s, vk_s = keyutil.make_keypair()
333         sk, _ignored = keyutil.parse_privkey(sk_s)
334
335         d = introducer.disownServiceParent()
336         def _offline(ign):
337             # now that the introducer server is offline, create a client and
338             # publish some messages
339             c.setServiceParent(self.parent) # this starts the reconnector
340             c.publish("storage", make_ann(furl1), sk)
341
342             introducer.setServiceParent(self.parent) # restart the server
343             # now wait for the messages to be delivered
344             def _got_announcement():
345                 return bool(introducer.get_announcements())
346             return self.poll(_got_announcement)
347         d.addCallback(_offline)
348         def _done(ign):
349             v = introducer.get_announcements()[0]
350             furl = v.announcement["anonymous-storage-FURL"]
351             self.failUnlessEqual(furl, furl1)
352         d.addCallback(_done)
353
354         # now let the ack get back
355         def _wait_until_idle(ign):
356             def _idle():
357                 if c._debug_outstanding:
358                     return False
359                 if introducer._debug_outstanding:
360                     return False
361                 return True
362             return self.poll(_idle)
363         d.addCallback(_wait_until_idle)
364         return d
365
366
367 V1 = "v1"; V2 = "v2"
368 class SystemTest(SystemTestMixin, unittest.TestCase):
369
370     def do_system_test(self, server_version):
371         self.create_tub()
372         if server_version == V1:
373             introducer = old.IntroducerService_v1()
374         else:
375             introducer = IntroducerService()
376         introducer.setServiceParent(self.parent)
377         iff = os.path.join(self.basedir, "introducer.furl")
378         tub = self.central_tub
379         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
380         self.introducer_furl = ifurl
381
382         # we have 5 clients who publish themselves as storage servers, and a
383         # sixth which does which not. All 6 clients subscriber to hear about
384         # storage. When the connections are fully established, all six nodes
385         # should have 5 connections each.
386         NUM_STORAGE = 5
387         NUM_CLIENTS = 6
388
389         clients = []
390         tubs = {}
391         received_announcements = {}
392         subscribing_clients = []
393         publishing_clients = []
394         self.the_introducer = introducer
395         privkeys = {}
396         expected_announcements = [0 for c in range(NUM_CLIENTS)]
397
398         for i in range(NUM_CLIENTS):
399             tub = Tub()
400             #tub.setOption("logLocalFailures", True)
401             #tub.setOption("logRemoteFailures", True)
402             tub.setOption("expose-remote-exception-types", False)
403             tub.setServiceParent(self.parent)
404             l = tub.listenOn("tcp:0")
405             portnum = l.getPortnum()
406             tub.setLocation("localhost:%d" % portnum)
407
408             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
409             if i == 0:
410                 c = old.IntroducerClient_v1(tub, self.introducer_furl,
411                                             NICKNAME % str(i),
412                                             "version", "oldest")
413             else:
414                 c = IntroducerClient(tub, self.introducer_furl,
415                                      NICKNAME % str(i),
416                                      "version", "oldest",
417                                      {"component": "component-v1"})
418             received_announcements[c] = {}
419             def got(key_s_or_tubid, ann, announcements, i):
420                 if i == 0:
421                     index = get_tubid_string_from_ann(ann)
422                 else:
423                     index = key_s_or_tubid or get_tubid_string_from_ann(ann)
424                 announcements[index] = ann
425             c.subscribe_to("storage", got, received_announcements[c], i)
426             subscribing_clients.append(c)
427             expected_announcements[i] += 1 # all expect a 'storage' announcement
428
429             node_furl = tub.registerReference(Referenceable())
430             if i < NUM_STORAGE:
431                 if i == 0:
432                     c.publish(node_furl, "storage", "ri_name")
433                 elif i == 1:
434                     # sign the announcement
435                     privkey_s, pubkey_s = keyutil.make_keypair()
436                     privkey, _ignored = keyutil.parse_privkey(privkey_s)
437                     privkeys[c] = privkey
438                     c.publish("storage", make_ann(node_furl), privkey)
439                 else:
440                     c.publish("storage", make_ann(node_furl))
441                 publishing_clients.append(c)
442             else:
443                 # the last one does not publish anything
444                 pass
445
446             if i == 0:
447                 # users of the V1 client were required to publish a
448                 # 'stub_client' record (somewhat after they published the
449                 # 'storage' record), so the introducer could see their
450                 # version. Match that behavior.
451                 c.publish(node_furl, "stub_client", "stub_ri_name")
452
453             if i == 2:
454                 # also publish something that nobody cares about
455                 boring_furl = tub.registerReference(Referenceable())
456                 c.publish("boring", make_ann(boring_furl))
457
458             c.setServiceParent(self.parent)
459             clients.append(c)
460             tubs[c] = tub
461
462
463         def _wait_for_connected(ign):
464             def _connected():
465                 for c in clients:
466                     if not c.connected_to_introducer():
467                         return False
468                 return True
469             return self.poll(_connected)
470
471         # we watch the clients to determine when the system has settled down.
472         # Then we can look inside the server to assert things about its
473         # state.
474
475         def _wait_for_expected_announcements(ign):
476             def _got_expected_announcements():
477                 for i,c in enumerate(subscribing_clients):
478                     if len(received_announcements[c]) < expected_announcements[i]:
479                         return False
480                 return True
481             return self.poll(_got_expected_announcements)
482
483         # before shutting down any Tub, we'd like to know that there are no
484         # messages outstanding
485
486         def _wait_until_idle(ign):
487             def _idle():
488                 for c in subscribing_clients + publishing_clients:
489                     if c._debug_outstanding:
490                         return False
491                 if self.the_introducer._debug_outstanding:
492                     return False
493                 return True
494             return self.poll(_idle)
495
496         d = defer.succeed(None)
497         d.addCallback(_wait_for_connected)
498         d.addCallback(_wait_for_expected_announcements)
499         d.addCallback(_wait_until_idle)
500
501         def _check1(res):
502             log.msg("doing _check1")
503             dc = self.the_introducer._debug_counts
504             if server_version == V1:
505                 # each storage server publishes a record, and (after its
506                 # 'subscribe' has been ACKed) also publishes a "stub_client".
507                 # The non-storage client (which subscribes) also publishes a
508                 # stub_client. There is also one "boring" service. The number
509                 # of messages is higher, because the stub_clients aren't
510                 # published until after we get the 'subscribe' ack (since we
511                 # don't realize that we're dealing with a v1 server [which
512                 # needs stub_clients] until then), and the act of publishing
513                 # the stub_client causes us to re-send all previous
514                 # announcements.
515                 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
516                                      NUM_STORAGE + NUM_CLIENTS + 1)
517             else:
518                 # each storage server publishes a record. There is also one
519                 # "stub_client" and one "boring"
520                 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
521                 self.failUnlessEqual(dc["inbound_duplicate"], 0)
522             self.failUnlessEqual(dc["inbound_update"], 0)
523             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
524             # the number of outbound messages is tricky.. I think it depends
525             # upon a race between the publish and the subscribe messages.
526             self.failUnless(dc["outbound_message"] > 0)
527             # each client subscribes to "storage", and each server publishes
528             self.failUnlessEqual(dc["outbound_announcements"],
529                                  NUM_STORAGE*NUM_CLIENTS)
530
531             for c in subscribing_clients:
532                 cdc = c._debug_counts
533                 self.failUnless(cdc["inbound_message"])
534                 self.failUnlessEqual(cdc["inbound_announcement"],
535                                      NUM_STORAGE)
536                 self.failUnlessEqual(cdc["wrong_service"], 0)
537                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
538                 self.failUnlessEqual(cdc["update"], 0)
539                 self.failUnlessEqual(cdc["new_announcement"],
540                                      NUM_STORAGE)
541                 anns = received_announcements[c]
542                 self.failUnlessEqual(len(anns), NUM_STORAGE)
543
544                 nodeid0 = tubs[clients[0]].tubID
545                 ann = anns[nodeid0]
546                 nick = ann["nickname"]
547                 self.failUnlessEqual(type(nick), unicode)
548                 self.failUnlessEqual(nick, NICKNAME % "0")
549             if server_version == V1:
550                 for c in publishing_clients:
551                     cdc = c._debug_counts
552                     expected = 1 # storage
553                     if c is clients[2]:
554                         expected += 1 # boring
555                     if c is not clients[0]:
556                         # the v2 client tries to call publish_v2, which fails
557                         # because the server is v1. It then re-sends
558                         # everything it has so far, plus a stub_client record
559                         expected = 2*expected + 1
560                     if c is clients[0]:
561                         # we always tell v1 client to send stub_client
562                         expected += 1
563                     self.failUnlessEqual(cdc["outbound_message"], expected)
564             else:
565                 for c in publishing_clients:
566                     cdc = c._debug_counts
567                     expected = 1
568                     if c in [clients[0], # stub_client
569                              clients[2], # boring
570                              ]:
571                         expected = 2
572                     self.failUnlessEqual(cdc["outbound_message"], expected)
573             # now check the web status, make sure it renders without error
574             ir = introweb.IntroducerRoot(self.parent)
575             self.parent.nodeid = "NODEID"
576             text = ir.renderSynchronously().decode("utf-8")
577             self.failUnlessIn(NICKNAME % "0", text) # the v1 client
578             self.failUnlessIn(NICKNAME % "1", text) # a v2 client
579             log.msg("_check1 done")
580         d.addCallback(_check1)
581
582         # force an introducer reconnect, by shutting down the Tub it's using
583         # and starting a new Tub (with the old introducer). Everybody should
584         # reconnect and republish, but the introducer should ignore the
585         # republishes as duplicates. However, because the server doesn't know
586         # what each client does and does not know, it will send them a copy
587         # of the current announcement table anyway.
588
589         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
590         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
591
592         def _wait_for_introducer_loss(ign):
593             def _introducer_lost():
594                 for c in clients:
595                     if c.connected_to_introducer():
596                         return False
597                 return True
598             return self.poll(_introducer_lost)
599         d.addCallback(_wait_for_introducer_loss)
600
601         def _restart_introducer_tub(_ign):
602             log.msg("restarting introducer's Tub")
603             # reset counters
604             for i in range(NUM_CLIENTS):
605                 c = subscribing_clients[i]
606                 for k in c._debug_counts:
607                     c._debug_counts[k] = 0
608             for k in self.the_introducer._debug_counts:
609                 self.the_introducer._debug_counts[k] = 0
610             expected_announcements[i] += 1 # new 'storage' for everyone
611             self.create_tub(self.central_portnum)
612             newfurl = self.central_tub.registerReference(self.the_introducer,
613                                                          furlFile=iff)
614             assert newfurl == self.introducer_furl
615         d.addCallback(_restart_introducer_tub)
616
617         d.addCallback(_wait_for_connected)
618         d.addCallback(_wait_for_expected_announcements)
619         d.addCallback(_wait_until_idle)
620         d.addCallback(lambda _ign: log.msg(" reconnected"))
621
622         # TODO: publish something while the introducer is offline, then
623         # confirm it gets delivered when the connection is reestablished
624         def _check2(res):
625             log.msg("doing _check2")
626             # assert that the introducer sent out new messages, one per
627             # subscriber
628             dc = self.the_introducer._debug_counts
629             self.failUnlessEqual(dc["outbound_announcements"],
630                                  NUM_STORAGE*NUM_CLIENTS)
631             self.failUnless(dc["outbound_message"] > 0)
632             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
633             for c in subscribing_clients:
634                 cdc = c._debug_counts
635                 self.failUnlessEqual(cdc["inbound_message"], 1)
636                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
637                 self.failUnlessEqual(cdc["new_announcement"], 0)
638                 self.failUnlessEqual(cdc["wrong_service"], 0)
639                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
640         d.addCallback(_check2)
641
642         # Then force an introducer restart, by shutting down the Tub,
643         # destroying the old introducer, and starting a new Tub+Introducer.
644         # Everybody should reconnect and republish, and the (new) introducer
645         # will distribute the new announcements, but the clients should
646         # ignore the republishes as duplicates.
647
648         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
649         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
650         d.addCallback(_wait_for_introducer_loss)
651         d.addCallback(lambda _ign: log.msg("introducer lost"))
652
653         def _restart_introducer(_ign):
654             log.msg("restarting introducer")
655             self.create_tub(self.central_portnum)
656             # reset counters
657             for i in range(NUM_CLIENTS):
658                 c = subscribing_clients[i]
659                 for k in c._debug_counts:
660                     c._debug_counts[k] = 0
661             expected_announcements[i] += 1 # new 'storage' for everyone
662             if server_version == V1:
663                 introducer = old.IntroducerService_v1()
664             else:
665                 introducer = IntroducerService()
666             self.the_introducer = introducer
667             newfurl = self.central_tub.registerReference(self.the_introducer,
668                                                          furlFile=iff)
669             assert newfurl == self.introducer_furl
670         d.addCallback(_restart_introducer)
671
672         d.addCallback(_wait_for_connected)
673         d.addCallback(_wait_for_expected_announcements)
674         d.addCallback(_wait_until_idle)
675
676         def _check3(res):
677             log.msg("doing _check3")
678             dc = self.the_introducer._debug_counts
679             self.failUnlessEqual(dc["outbound_announcements"],
680                                  NUM_STORAGE*NUM_CLIENTS)
681             self.failUnless(dc["outbound_message"] > 0)
682             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
683             for c in subscribing_clients:
684                 cdc = c._debug_counts
685                 self.failUnless(cdc["inbound_message"] > 0)
686                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
687                 self.failUnlessEqual(cdc["new_announcement"], 0)
688                 self.failUnlessEqual(cdc["wrong_service"], 0)
689                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
690
691         d.addCallback(_check3)
692         return d
693
694
695     def test_system_v2_server(self):
696         self.basedir = "introducer/SystemTest/system_v2_server"
697         os.makedirs(self.basedir)
698         return self.do_system_test(V2)
699     test_system_v2_server.timeout = 480
700     # occasionally takes longer than 350s on "draco"
701
702     def test_system_v1_server(self):
703         self.basedir = "introducer/SystemTest/system_v1_server"
704         os.makedirs(self.basedir)
705         return self.do_system_test(V1)
706     test_system_v1_server.timeout = 480
707     # occasionally takes longer than 350s on "draco"
708
709 class FakeRemoteReference:
710     def notifyOnDisconnect(self, *args, **kwargs): pass
711     def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
712     def getLocationHints(self): return [("ipv4", "here.example.com", "1234"),
713                                         ("ipv4", "there.example.com", "2345")]
714     def getPeer(self): return address.IPv4Address("TCP", "remote.example.com",
715                                                   3456)
716
717 class ClientInfo(unittest.TestCase):
718     def test_client_v2(self):
719         introducer = IntroducerService()
720         tub = introducer_furl = None
721         app_versions = {"whizzy": "fizzy"}
722         client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
723                                      "my_version", "oldest", app_versions)
724         #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
725         #ann_s = make_ann_t(client_v2, furl1, None)
726         #introducer.remote_publish_v2(ann_s, Referenceable())
727         subscriber = FakeRemoteReference()
728         introducer.remote_subscribe_v2(subscriber, "storage",
729                                        client_v2._my_subscriber_info)
730         subs = introducer.get_subscribers()
731         self.failUnlessEqual(len(subs), 1)
732         s0 = subs[0]
733         self.failUnlessEqual(s0.service_name, "storage")
734         self.failUnlessEqual(s0.app_versions, app_versions)
735         self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
736         self.failUnlessEqual(s0.version, "my_version")
737
738     def test_client_v1(self):
739         introducer = IntroducerService()
740         subscriber = FakeRemoteReference()
741         introducer.remote_subscribe(subscriber, "storage")
742         # the v1 subscribe interface had no subscriber_info: that was usually
743         # sent in a separate stub_client pseudo-announcement
744         subs = introducer.get_subscribers()
745         self.failUnlessEqual(len(subs), 1)
746         s0 = subs[0]
747         self.failUnlessEqual(s0.nickname, u"?") # not known yet
748         self.failUnlessEqual(s0.service_name, "storage")
749
750         # now submit the stub_client announcement
751         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
752         ann = (furl1, "stub_client", "RIStubClient",
753                (NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
754         introducer.remote_publish(ann)
755         # the server should correlate the two
756         subs = introducer.get_subscribers()
757         self.failUnlessEqual(len(subs), 1)
758         s0 = subs[0]
759         self.failUnlessEqual(s0.service_name, "storage")
760         # v1 announcements do not contain app-versions
761         self.failUnlessEqual(s0.app_versions, {})
762         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
763         self.failUnlessEqual(s0.version, "my_version")
764
765         # a subscription that arrives after the stub_client announcement
766         # should be correlated too
767         subscriber2 = FakeRemoteReference()
768         introducer.remote_subscribe(subscriber2, "thing2")
769
770         subs = introducer.get_subscribers()
771         self.failUnlessEqual(len(subs), 2)
772         s0 = [s for s in subs if s.service_name == "thing2"][0]
773         # v1 announcements do not contain app-versions
774         self.failUnlessEqual(s0.app_versions, {})
775         self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
776         self.failUnlessEqual(s0.version, "my_version")
777
778 class Announcements(unittest.TestCase):
779     def test_client_v2_unsigned(self):
780         introducer = IntroducerService()
781         tub = introducer_furl = None
782         app_versions = {"whizzy": "fizzy"}
783         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
784                                      "my_version", "oldest", app_versions)
785         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
786         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
787         ann_s0 = make_ann_t(client_v2, furl1, None)
788         canary0 = Referenceable()
789         introducer.remote_publish_v2(ann_s0, canary0)
790         a = introducer.get_announcements()
791         self.failUnlessEqual(len(a), 1)
792         self.failUnlessIdentical(a[0].canary, canary0)
793         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
794         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
795         self.failUnlessEqual(a[0].nickname, u"nick-v2")
796         self.failUnlessEqual(a[0].service_name, "storage")
797         self.failUnlessEqual(a[0].version, "my_version")
798         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
799
800     def test_client_v2_signed(self):
801         introducer = IntroducerService()
802         tub = introducer_furl = None
803         app_versions = {"whizzy": "fizzy"}
804         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
805                                      "my_version", "oldest", app_versions)
806         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
807         sk_s, vk_s = keyutil.make_keypair()
808         sk, _ignored = keyutil.parse_privkey(sk_s)
809         pks = keyutil.remove_prefix(vk_s, "pub-")
810         ann_t0 = make_ann_t(client_v2, furl1, sk)
811         canary0 = Referenceable()
812         introducer.remote_publish_v2(ann_t0, canary0)
813         a = introducer.get_announcements()
814         self.failUnlessEqual(len(a), 1)
815         self.failUnlessIdentical(a[0].canary, canary0)
816         self.failUnlessEqual(a[0].index, ("storage", pks, None))
817         self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
818         self.failUnlessEqual(a[0].nickname, u"nick-v2")
819         self.failUnlessEqual(a[0].service_name, "storage")
820         self.failUnlessEqual(a[0].version, "my_version")
821         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
822
823     def test_client_v1(self):
824         introducer = IntroducerService()
825
826         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
827         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
828         ann = (furl1, "storage", "RIStorage",
829                u"nick-v1".encode("utf-8"), "my_version", "oldest")
830         introducer.remote_publish(ann)
831
832         a = introducer.get_announcements()
833         self.failUnlessEqual(len(a), 1)
834         self.failUnlessEqual(a[0].index, ("storage", None, tubid))
835         self.failUnlessEqual(a[0].canary, None)
836         self.failUnlessEqual(a[0].announcement["app-versions"], {})
837         self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
838         self.failUnlessEqual(a[0].service_name, "storage")
839         self.failUnlessEqual(a[0].version, "my_version")
840         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
841
842
843 class TooNewServer(IntroducerService):
844     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
845                  { },
846                 "application-version": "greetings from the crazy future",
847                 }
848
849 class NonV1Server(SystemTestMixin, unittest.TestCase):
850     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
851     # protocol, it is supposed to provide a useful error instead of a weird
852     # exception.
853
854     def test_failure(self):
855         self.basedir = "introducer/NonV1Server/failure"
856         os.makedirs(self.basedir)
857         self.create_tub()
858         i = TooNewServer()
859         i.setServiceParent(self.parent)
860         self.introducer_furl = self.central_tub.registerReference(i)
861
862         tub = Tub()
863         tub.setOption("expose-remote-exception-types", False)
864         tub.setServiceParent(self.parent)
865         l = tub.listenOn("tcp:0")
866         portnum = l.getPortnum()
867         tub.setLocation("localhost:%d" % portnum)
868
869         c = IntroducerClient(tub, self.introducer_furl,
870                              u"nickname-client", "version", "oldest", {})
871         announcements = {}
872         def got(key_s, ann):
873             announcements[key_s] = ann
874         c.subscribe_to("storage", got)
875
876         c.setServiceParent(self.parent)
877
878         # now we wait for it to connect and notice the bad version
879
880         def _got_bad():
881             return bool(c._introducer_error) or bool(c._publisher)
882         d = self.poll(_got_bad)
883         def _done(res):
884             self.failUnless(c._introducer_error)
885             self.failUnless(c._introducer_error.check(InsufficientVersionError),
886                             c._introducer_error)
887         d.addCallback(_done)
888         return d
889
890 class DecodeFurl(unittest.TestCase):
891     def test_decode(self):
892         # make sure we have a working base64.b32decode. The one in
893         # python2.4.[01] was broken.
894         furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
895         m = re.match(r'pb://(\w+)@', furl)
896         assert m
897         nodeid = b32decode(m.group(1).upper())
898         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
899
900 class Signatures(unittest.TestCase):
901     def test_sign(self):
902         ann = {"key1": "value1"}
903         sk_s,vk_s = keyutil.make_keypair()
904         sk,ignored = keyutil.parse_privkey(sk_s)
905         ann_t = sign_to_foolscap(ann, sk)
906         (msg, sig, key) = ann_t
907         self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
908         self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
909         self.failUnless(sig.startswith("v0-"))
910         self.failUnless(key.startswith("v0-"))
911         (ann2,key2) = unsign_from_foolscap(ann_t)
912         self.failUnlessEqual(ann2, ann)
913         self.failUnlessEqual("pub-"+key2, vk_s)
914
915         # bad signature
916         bad_ann = {"key1": "value2"}
917         bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
918         self.failUnlessRaises(keyutil.BadSignatureError,
919                               unsign_from_foolscap, (bad_msg,sig,key))
920         # sneaky bad signature should be ignored
921         (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
922         self.failUnlessEqual(key2, None)
923         self.failUnlessEqual(ann2, bad_ann)
924
925         # unrecognized signatures
926         self.failUnlessRaises(UnknownKeyError,
927                               unsign_from_foolscap, (bad_msg,"v999-sig",key))
928         self.failUnlessRaises(UnknownKeyError,
929                               unsign_from_foolscap, (bad_msg,sig,"v999-key"))
930
931
932 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
933 # should leave the Reconnector in place, also if it receives
934 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
935 # should tear down the Reconnector and make a new one. This behavior used to
936 # live in the IntroducerClient, and thus used to be tested by test_introducer
937
938 # copying more tests from old branch:
939
940 #  then also add Upgrade test