]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
new introducer: signed extensible dictionary-based messages! refs #466
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 import os, re
3 from base64 import b32decode
4 import simplejson
5
6 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.python import log
9
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14      WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17      get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
18      UnknownKeyError
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.util import pollmixin, keyutil
23 import allmydata.test.common_util as testutil
24
25 class LoggingMultiService(service.MultiService):
26     def log(self, msg, **kw):
27         log.msg(msg, **kw)
28
29 class Node(testutil.SignalMixin, unittest.TestCase):
30     def test_loadable(self):
31         basedir = "introducer.IntroducerNode.test_loadable"
32         os.mkdir(basedir)
33         q = IntroducerNode(basedir)
34         d = fireEventually(None)
35         d.addCallback(lambda res: q.startService())
36         d.addCallback(lambda res: q.when_tub_ready())
37         d.addCallback(lambda res: q.stopService())
38         d.addCallback(flushEventualQueue)
39         return d
40
41 class ServiceMixin:
42     def setUp(self):
43         self.parent = LoggingMultiService()
44         self.parent.startService()
45     def tearDown(self):
46         log.msg("TestIntroducer.tearDown")
47         d = defer.succeed(None)
48         d.addCallback(lambda res: self.parent.stopService())
49         d.addCallback(flushEventualQueue)
50         return d
51
52 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
53
54     def test_create(self):
55         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
56                               "my_version", "oldest_version", {})
57         self.failUnless(isinstance(ic, IntroducerClient))
58
59     def test_listen(self):
60         i = IntroducerService()
61         i.setServiceParent(self.parent)
62
63     def test_duplicate_publish(self):
64         i = IntroducerService()
65         self.failUnlessEqual(len(i.get_announcements()), 0)
66         self.failUnlessEqual(len(i.get_subscribers()), 0)
67         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
68         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
69         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
70         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
71         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
72         i.remote_publish(ann1)
73         self.failUnlessEqual(len(i.get_announcements()), 1)
74         self.failUnlessEqual(len(i.get_subscribers()), 0)
75         i.remote_publish(ann2)
76         self.failUnlessEqual(len(i.get_announcements()), 2)
77         self.failUnlessEqual(len(i.get_subscribers()), 0)
78         i.remote_publish(ann1b)
79         self.failUnlessEqual(len(i.get_announcements()), 2)
80         self.failUnlessEqual(len(i.get_subscribers()), 0)
81
82     def test_id_collision(self):
83         # test replacement case where tubid equals a keyid (one should
84         # not replace the other)
85         i = IntroducerService()
86         ic = IntroducerClient(None,
87                               "introducer.furl", u"my_nickname",
88                               "my_version", "oldest_version", {})
89         sk_s, vk_s = keyutil.make_keypair()
90         sk, _ignored = keyutil.parse_privkey(sk_s)
91         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
92         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
93         ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
94         i.remote_publish_v2(ann_t, Referenceable())
95         announcements = i.get_announcements()
96         self.failUnlessEqual(len(announcements), 1)
97         key1 = ("storage", "v0-"+keyid, None)
98         self.failUnless(key1 in announcements)
99         (ign, ign, ann1_out, ign) = announcements[key1]
100         self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
101
102         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
103         ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
104         i.remote_publish(ann2)
105         self.failUnlessEqual(len(announcements), 2)
106         key2 = ("storage", None, keyid)
107         self.failUnless(key2 in announcements)
108         (ign, ign, ann2_out, ign) = announcements[key2]
109         self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
110
111
112 def make_ann(furl):
113     ann = { "anonymous-storage-FURL": furl,
114             "permutation-seed-base32": get_tubid_string(furl) }
115     return ann
116
117 def make_ann_t(ic, furl, privkey):
118     return ic.create_announcement("storage", make_ann(furl), privkey)
119
120 class Client(unittest.TestCase):
121     def test_duplicate_receive_v1(self):
122         ic = IntroducerClient(None,
123                               "introducer.furl", u"my_nickname",
124                               "my_version", "oldest_version", {})
125         announcements = []
126         ic.subscribe_to("storage",
127                         lambda key_s,ann: announcements.append(ann))
128         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
129         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
130         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
131         ca = WrapV2ClientInV1Interface(ic)
132
133         ca.remote_announce([ann1])
134         d = fireEventually()
135         def _then(ign):
136             self.failUnlessEqual(len(announcements), 1)
137             self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
138             self.failUnlessEqual(announcements[0]["my-version"], "ver23")
139             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
140             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
141             self.failUnlessEqual(ic._debug_counts["update"], 0)
142             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
143             # now send a duplicate announcement: this should not notify clients
144             ca.remote_announce([ann1])
145             return fireEventually()
146         d.addCallback(_then)
147         def _then2(ign):
148             self.failUnlessEqual(len(announcements), 1)
149             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
150             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
151             self.failUnlessEqual(ic._debug_counts["update"], 0)
152             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
153             # and a replacement announcement: same FURL, new other stuff.
154             # Clients should be notified.
155             ca.remote_announce([ann1b])
156             return fireEventually()
157         d.addCallback(_then2)
158         def _then3(ign):
159             self.failUnlessEqual(len(announcements), 2)
160             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
161             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
162             self.failUnlessEqual(ic._debug_counts["update"], 1)
163             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
164             # test that the other stuff changed
165             self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
166             self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
167         d.addCallback(_then3)
168         return d
169
170     def test_duplicate_receive_v2(self):
171         ic1 = IntroducerClient(None,
172                                "introducer.furl", u"my_nickname",
173                                "ver23", "oldest_version", {})
174         # we use a second client just to create a different-looking
175         # announcement
176         ic2 = IntroducerClient(None,
177                                "introducer.furl", u"my_nickname",
178                                "ver24","oldest_version",{})
179         announcements = []
180         def _received(key_s, ann):
181             announcements.append( (key_s, ann) )
182         ic1.subscribe_to("storage", _received)
183         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
184         furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
185         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
186
187         privkey_s, pubkey_vs = keyutil.make_keypair()
188         privkey, _ignored = keyutil.parse_privkey(privkey_s)
189         pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
190
191         # ann1: ic1, furl1
192         # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
193         # ann1b: ic2, furl1
194         # ann2: ic2, furl2
195
196         self.ann1 = make_ann_t(ic1, furl1, privkey)
197         self.ann1a = make_ann_t(ic1, furl1a, privkey)
198         self.ann1b = make_ann_t(ic2, furl1, privkey)
199         self.ann2 = make_ann_t(ic2, furl2, privkey)
200
201         ic1.remote_announce_v2([self.ann1]) # queues eventual-send
202         d = fireEventually()
203         def _then1(ign):
204             self.failUnlessEqual(len(announcements), 1)
205             key_s,ann = announcements[0]
206             self.failUnlessEqual(key_s, pubkey_s)
207             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
208             self.failUnlessEqual(ann["my-version"], "ver23")
209         d.addCallback(_then1)
210
211         # now send a duplicate announcement. This should not fire the
212         # subscriber
213         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
214         d.addCallback(fireEventually)
215         def _then2(ign):
216             self.failUnlessEqual(len(announcements), 1)
217         d.addCallback(_then2)
218
219         # and a replacement announcement: same FURL, new other stuff. The
220         # subscriber *should* be fired.
221         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
222         d.addCallback(fireEventually)
223         def _then3(ign):
224             self.failUnlessEqual(len(announcements), 2)
225             key_s,ann = announcements[-1]
226             self.failUnlessEqual(key_s, pubkey_s)
227             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
228             self.failUnlessEqual(ann["my-version"], "ver24")
229         d.addCallback(_then3)
230
231         # and a replacement announcement with a different FURL (it uses
232         # different connection hints)
233         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
234         d.addCallback(fireEventually)
235         def _then4(ign):
236             self.failUnlessEqual(len(announcements), 3)
237             key_s,ann = announcements[-1]
238             self.failUnlessEqual(key_s, pubkey_s)
239             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
240             self.failUnlessEqual(ann["my-version"], "ver23")
241         d.addCallback(_then4)
242
243         # now add a new subscription, which should be called with the
244         # backlog. The introducer only records one announcement per index, so
245         # the backlog will only have the latest message.
246         announcements2 = []
247         def _received2(key_s, ann):
248             announcements2.append( (key_s, ann) )
249         d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
250         d.addCallback(fireEventually)
251         def _then5(ign):
252             self.failUnlessEqual(len(announcements2), 1)
253             key_s,ann = announcements2[-1]
254             self.failUnlessEqual(key_s, pubkey_s)
255             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
256             self.failUnlessEqual(ann["my-version"], "ver23")
257         d.addCallback(_then5)
258         return d
259
260     def test_id_collision(self):
261         # test replacement case where tubid equals a keyid (one should
262         # not replace the other)
263         ic = IntroducerClient(None,
264                               "introducer.furl", u"my_nickname",
265                               "my_version", "oldest_version", {})
266         announcements = []
267         ic.subscribe_to("storage",
268                         lambda key_s,ann: announcements.append(ann))
269         sk_s, vk_s = keyutil.make_keypair()
270         sk, _ignored = keyutil.parse_privkey(sk_s)
271         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
272         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
273         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
274         ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
275         ic.remote_announce_v2([ann_t])
276         d = fireEventually()
277         def _then(ign):
278             # first announcement has been processed
279             self.failUnlessEqual(len(announcements), 1)
280             self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
281                                  furl1)
282             # now submit a second one, with a tubid that happens to look just
283             # like the pubkey-based serverid we just processed. They should
284             # not overlap.
285             ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
286             ca = WrapV2ClientInV1Interface(ic)
287             ca.remote_announce([ann2])
288             return fireEventually()
289         d.addCallback(_then)
290         def _then2(ign):
291             # if they overlapped, the second announcement would be ignored
292             self.failUnlessEqual(len(announcements), 2)
293             self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
294                                  furl2)
295         d.addCallback(_then2)
296         return d
297
298
299 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
300
301     def create_tub(self, portnum=0):
302         tubfile = os.path.join(self.basedir, "tub.pem")
303         self.central_tub = tub = Tub(certFile=tubfile)
304         #tub.setOption("logLocalFailures", True)
305         #tub.setOption("logRemoteFailures", True)
306         tub.setOption("expose-remote-exception-types", False)
307         tub.setServiceParent(self.parent)
308         l = tub.listenOn("tcp:%d" % portnum)
309         self.central_portnum = l.getPortnum()
310         if portnum != 0:
311             assert self.central_portnum == portnum
312         tub.setLocation("localhost:%d" % self.central_portnum)
313
314 class Queue(SystemTestMixin, unittest.TestCase):
315     def test_queue_until_connected(self):
316         self.basedir = "introducer/QueueUntilConnected/queued"
317         os.makedirs(self.basedir)
318         self.create_tub()
319         introducer = IntroducerService()
320         introducer.setServiceParent(self.parent)
321         iff = os.path.join(self.basedir, "introducer.furl")
322         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
323         tub2 = Tub()
324         tub2.setServiceParent(self.parent)
325         c = IntroducerClient(tub2, ifurl,
326                              u"nickname", "version", "oldest", {})
327         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
328         sk_s, vk_s = keyutil.make_keypair()
329         sk, _ignored = keyutil.parse_privkey(sk_s)
330
331         d = introducer.disownServiceParent()
332         def _offline(ign):
333             # now that the introducer server is offline, create a client and
334             # publish some messages
335             c.setServiceParent(self.parent) # this starts the reconnector
336             c.publish("storage", make_ann(furl1), sk)
337
338             introducer.setServiceParent(self.parent) # restart the server
339             # now wait for the messages to be delivered
340             def _got_announcement():
341                 return bool(introducer.get_announcements())
342             return self.poll(_got_announcement)
343         d.addCallback(_offline)
344         def _done(ign):
345             v = list(introducer.get_announcements().values())[0]
346             (ign, ign, ann1_out, ign) = v
347             self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
348         d.addCallback(_done)
349
350         # now let the ack get back
351         def _wait_until_idle(ign):
352             def _idle():
353                 if c._debug_outstanding:
354                     return False
355                 if introducer._debug_outstanding:
356                     return False
357                 return True
358             return self.poll(_idle)
359         d.addCallback(_wait_until_idle)
360         return d
361
362
363 V1 = "v1"; V2 = "v2"
364 class SystemTest(SystemTestMixin, unittest.TestCase):
365
366     def do_system_test(self, server_version):
367         self.create_tub()
368         if server_version == V1:
369             introducer = old.IntroducerService_v1()
370         else:
371             introducer = IntroducerService()
372         introducer.setServiceParent(self.parent)
373         iff = os.path.join(self.basedir, "introducer.furl")
374         tub = self.central_tub
375         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
376         self.introducer_furl = ifurl
377
378         # we have 5 clients who publish themselves as storage servers, and a
379         # sixth which does which not. All 6 clients subscriber to hear about
380         # storage. When the connections are fully established, all six nodes
381         # should have 5 connections each.
382         NUM_STORAGE = 5
383         NUM_CLIENTS = 6
384
385         clients = []
386         tubs = {}
387         received_announcements = {}
388         subscribing_clients = []
389         publishing_clients = []
390         privkeys = {}
391         expected_announcements = [0 for c in range(NUM_CLIENTS)]
392
393         for i in range(NUM_CLIENTS):
394             tub = Tub()
395             #tub.setOption("logLocalFailures", True)
396             #tub.setOption("logRemoteFailures", True)
397             tub.setOption("expose-remote-exception-types", False)
398             tub.setServiceParent(self.parent)
399             l = tub.listenOn("tcp:0")
400             portnum = l.getPortnum()
401             tub.setLocation("localhost:%d" % portnum)
402
403             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
404             if i == 0:
405                 c = old.IntroducerClient_v1(tub, self.introducer_furl,
406                                             u"nickname-%d" % i,
407                                             "version", "oldest")
408             else:
409                 c = IntroducerClient(tub, self.introducer_furl,
410                                      u"nickname-%d" % i,
411                                      "version", "oldest",
412                                      {"component": "component-v1"})
413             received_announcements[c] = {}
414             def got(key_s_or_tubid, ann, announcements, i):
415                 if i == 0:
416                     index = get_tubid_string_from_ann(ann)
417                 else:
418                     index = key_s_or_tubid or get_tubid_string_from_ann(ann)
419                 announcements[index] = ann
420             c.subscribe_to("storage", got, received_announcements[c], i)
421             subscribing_clients.append(c)
422             expected_announcements[i] += 1 # all expect a 'storage' announcement
423
424             node_furl = tub.registerReference(Referenceable())
425             if i < NUM_STORAGE:
426                 if i == 0:
427                     c.publish(node_furl, "storage", "ri_name")
428                 elif i == 1:
429                     # sign the announcement
430                     privkey_s, pubkey_s = keyutil.make_keypair()
431                     privkey, _ignored = keyutil.parse_privkey(privkey_s)
432                     privkeys[c] = privkey
433                     c.publish("storage", make_ann(node_furl), privkey)
434                 else:
435                     c.publish("storage", make_ann(node_furl))
436                 publishing_clients.append(c)
437             else:
438                 # the last one does not publish anything
439                 pass
440
441             if i == 0:
442                 # users of the V1 client were required to publish a
443                 # 'stub_client' record (somewhat after they published the
444                 # 'storage' record), so the introducer could see their
445                 # version. Match that behavior.
446                 c.publish(node_furl, "stub_client", "stub_ri_name")
447
448             if i == 2:
449                 # also publish something that nobody cares about
450                 boring_furl = tub.registerReference(Referenceable())
451                 c.publish("boring", make_ann(boring_furl))
452
453             c.setServiceParent(self.parent)
454             clients.append(c)
455             tubs[c] = tub
456
457
458         def _wait_for_connected(ign):
459             def _connected():
460                 for c in clients:
461                     if not c.connected_to_introducer():
462                         return False
463                 return True
464             return self.poll(_connected)
465
466         # we watch the clients to determine when the system has settled down.
467         # Then we can look inside the server to assert things about its
468         # state.
469
470         def _wait_for_expected_announcements(ign):
471             def _got_expected_announcements():
472                 for i,c in enumerate(subscribing_clients):
473                     if len(received_announcements[c]) < expected_announcements[i]:
474                         return False
475                 return True
476             return self.poll(_got_expected_announcements)
477
478         # before shutting down any Tub, we'd like to know that there are no
479         # messages outstanding
480
481         def _wait_until_idle(ign):
482             def _idle():
483                 for c in subscribing_clients + publishing_clients:
484                     if c._debug_outstanding:
485                         return False
486                 if introducer._debug_outstanding:
487                     return False
488                 return True
489             return self.poll(_idle)
490
491         d = defer.succeed(None)
492         d.addCallback(_wait_for_connected)
493         d.addCallback(_wait_for_expected_announcements)
494         d.addCallback(_wait_until_idle)
495
496         def _check1(res):
497             log.msg("doing _check1")
498             dc = introducer._debug_counts
499             if server_version == V1:
500                 # each storage server publishes a record, and (after its
501                 # 'subscribe' has been ACKed) also publishes a "stub_client".
502                 # The non-storage client (which subscribes) also publishes a
503                 # stub_client. There is also one "boring" service. The number
504                 # of messages is higher, because the stub_clients aren't
505                 # published until after we get the 'subscribe' ack (since we
506                 # don't realize that we're dealing with a v1 server [which
507                 # needs stub_clients] until then), and the act of publishing
508                 # the stub_client causes us to re-send all previous
509                 # announcements.
510                 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
511                                      NUM_STORAGE + NUM_CLIENTS + 1)
512             else:
513                 # each storage server publishes a record. There is also one
514                 # "stub_client" and one "boring"
515                 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
516                 self.failUnlessEqual(dc["inbound_duplicate"], 0)
517             self.failUnlessEqual(dc["inbound_update"], 0)
518             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
519             # the number of outbound messages is tricky.. I think it depends
520             # upon a race between the publish and the subscribe messages.
521             self.failUnless(dc["outbound_message"] > 0)
522             # each client subscribes to "storage", and each server publishes
523             self.failUnlessEqual(dc["outbound_announcements"],
524                                  NUM_STORAGE*NUM_CLIENTS)
525
526             for c in subscribing_clients:
527                 cdc = c._debug_counts
528                 self.failUnless(cdc["inbound_message"])
529                 self.failUnlessEqual(cdc["inbound_announcement"],
530                                      NUM_STORAGE)
531                 self.failUnlessEqual(cdc["wrong_service"], 0)
532                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
533                 self.failUnlessEqual(cdc["update"], 0)
534                 self.failUnlessEqual(cdc["new_announcement"],
535                                      NUM_STORAGE)
536                 anns = received_announcements[c]
537                 self.failUnlessEqual(len(anns), NUM_STORAGE)
538
539                 nodeid0 = tubs[clients[0]].tubID
540                 ann = anns[nodeid0]
541                 nick = ann["nickname"]
542                 self.failUnlessEqual(type(nick), unicode)
543                 self.failUnlessEqual(nick, u"nickname-0")
544             if server_version == V1:
545                 for c in publishing_clients:
546                     cdc = c._debug_counts
547                     expected = 1 # storage
548                     if c is clients[2]:
549                         expected += 1 # boring
550                     if c is not clients[0]:
551                         # the v2 client tries to call publish_v2, which fails
552                         # because the server is v1. It then re-sends
553                         # everything it has so far, plus a stub_client record
554                         expected = 2*expected + 1
555                     if c is clients[0]:
556                         # we always tell v1 client to send stub_client
557                         expected += 1
558                     self.failUnlessEqual(cdc["outbound_message"], expected)
559             else:
560                 for c in publishing_clients:
561                     cdc = c._debug_counts
562                     expected = 1
563                     if c in [clients[0], # stub_client
564                              clients[2], # boring
565                              ]:
566                         expected = 2
567                     self.failUnlessEqual(cdc["outbound_message"], expected)
568             log.msg("_check1 done")
569         d.addCallback(_check1)
570
571         # force an introducer reconnect, by shutting down the Tub it's using
572         # and starting a new Tub (with the old introducer). Everybody should
573         # reconnect and republish, but the introducer should ignore the
574         # republishes as duplicates. However, because the server doesn't know
575         # what each client does and does not know, it will send them a copy
576         # of the current announcement table anyway.
577
578         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
579         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
580
581         def _wait_for_introducer_loss(ign):
582             def _introducer_lost():
583                 for c in clients:
584                     if c.connected_to_introducer():
585                         return False
586                 return True
587             return self.poll(_introducer_lost)
588         d.addCallback(_wait_for_introducer_loss)
589
590         def _restart_introducer_tub(_ign):
591             log.msg("restarting introducer's Tub")
592             # reset counters
593             for i in range(NUM_CLIENTS):
594                 c = subscribing_clients[i]
595                 for k in c._debug_counts:
596                     c._debug_counts[k] = 0
597             for k in introducer._debug_counts:
598                 introducer._debug_counts[k] = 0
599             expected_announcements[i] += 1 # new 'storage' for everyone
600             self.create_tub(self.central_portnum)
601             newfurl = self.central_tub.registerReference(introducer,
602                                                          furlFile=iff)
603             assert newfurl == self.introducer_furl
604         d.addCallback(_restart_introducer_tub)
605
606         d.addCallback(_wait_for_connected)
607         d.addCallback(_wait_for_expected_announcements)
608         d.addCallback(_wait_until_idle)
609         d.addCallback(lambda _ign: log.msg(" reconnected"))
610
611         # TODO: publish something while the introducer is offline, then
612         # confirm it gets delivered when the connection is reestablished
613         def _check2(res):
614             log.msg("doing _check2")
615             # assert that the introducer sent out new messages, one per
616             # subscriber
617             dc = introducer._debug_counts
618             self.failUnlessEqual(dc["outbound_announcements"],
619                                  NUM_STORAGE*NUM_CLIENTS)
620             self.failUnless(dc["outbound_message"] > 0)
621             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
622             for c in subscribing_clients:
623                 cdc = c._debug_counts
624                 self.failUnlessEqual(cdc["inbound_message"], 1)
625                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
626                 self.failUnlessEqual(cdc["new_announcement"], 0)
627                 self.failUnlessEqual(cdc["wrong_service"], 0)
628                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
629         d.addCallback(_check2)
630
631         # Then force an introducer restart, by shutting down the Tub,
632         # destroying the old introducer, and starting a new Tub+Introducer.
633         # Everybody should reconnect and republish, and the (new) introducer
634         # will distribute the new announcements, but the clients should
635         # ignore the republishes as duplicates.
636
637         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
638         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
639         d.addCallback(_wait_for_introducer_loss)
640         d.addCallback(lambda _ign: log.msg("introducer lost"))
641
642         def _restart_introducer(_ign):
643             log.msg("restarting introducer")
644             self.create_tub(self.central_portnum)
645             # reset counters
646             for i in range(NUM_CLIENTS):
647                 c = subscribing_clients[i]
648                 for k in c._debug_counts:
649                     c._debug_counts[k] = 0
650             expected_announcements[i] += 1 # new 'storage' for everyone
651             if server_version == V1:
652                 introducer = old.IntroducerService_v1()
653             else:
654                 introducer = IntroducerService()
655             newfurl = self.central_tub.registerReference(introducer,
656                                                          furlFile=iff)
657             assert newfurl == self.introducer_furl
658         d.addCallback(_restart_introducer)
659
660         d.addCallback(_wait_for_connected)
661         d.addCallback(_wait_for_expected_announcements)
662         d.addCallback(_wait_until_idle)
663
664         def _check3(res):
665             log.msg("doing _check3")
666             dc = introducer._debug_counts
667             self.failUnlessEqual(dc["outbound_announcements"],
668                                  NUM_STORAGE*NUM_CLIENTS)
669             self.failUnless(dc["outbound_message"] > 0)
670             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
671             for c in subscribing_clients:
672                 cdc = c._debug_counts
673                 self.failUnless(cdc["inbound_message"] > 0)
674                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
675                 self.failUnlessEqual(cdc["new_announcement"], 0)
676                 self.failUnlessEqual(cdc["wrong_service"], 0)
677                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
678
679         d.addCallback(_check3)
680         return d
681
682
683     def test_system_v2_server(self):
684         self.basedir = "introducer/SystemTest/system_v2_server"
685         os.makedirs(self.basedir)
686         return self.do_system_test(V2)
687     test_system_v2_server.timeout = 480
688     # occasionally takes longer than 350s on "draco"
689
690     def test_system_v1_server(self):
691         self.basedir = "introducer/SystemTest/system_v1_server"
692         os.makedirs(self.basedir)
693         return self.do_system_test(V1)
694     test_system_v1_server.timeout = 480
695     # occasionally takes longer than 350s on "draco"
696
697 class FakeRemoteReference:
698     def notifyOnDisconnect(self, *args, **kwargs): pass
699     def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
700
701 class ClientInfo(unittest.TestCase):
702     def test_client_v2(self):
703         introducer = IntroducerService()
704         tub = introducer_furl = None
705         app_versions = {"whizzy": "fizzy"}
706         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
707                                      "my_version", "oldest", app_versions)
708         #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
709         #ann_s = make_ann_t(client_v2, furl1, None)
710         #introducer.remote_publish_v2(ann_s, Referenceable())
711         subscriber = FakeRemoteReference()
712         introducer.remote_subscribe_v2(subscriber, "storage",
713                                        client_v2._my_subscriber_info)
714         s = introducer.get_subscribers()
715         self.failUnlessEqual(len(s), 1)
716         sn, when, si, rref = s[0]
717         self.failUnlessIdentical(rref, subscriber)
718         self.failUnlessEqual(sn, "storage")
719         self.failUnlessEqual(si["version"], 0)
720         self.failUnlessEqual(si["oldest-supported"], "oldest")
721         self.failUnlessEqual(si["app-versions"], app_versions)
722         self.failUnlessEqual(si["nickname"], u"nick-v2")
723         self.failUnlessEqual(si["my-version"], "my_version")
724
725     def test_client_v1(self):
726         introducer = IntroducerService()
727         subscriber = FakeRemoteReference()
728         introducer.remote_subscribe(subscriber, "storage")
729         # the v1 subscribe interface had no subscriber_info: that was usually
730         # sent in a separate stub_client pseudo-announcement
731         s = introducer.get_subscribers()
732         self.failUnlessEqual(len(s), 1)
733         sn, when, si, rref = s[0]
734         # rref will be a WrapV1SubscriberInV2Interface around the real
735         # subscriber
736         self.failUnlessIdentical(rref.original, subscriber)
737         self.failUnlessEqual(si, None) # not known yet
738         self.failUnlessEqual(sn, "storage")
739
740         # now submit the stub_client announcement
741         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
742         ann = (furl1, "stub_client", "RIStubClient",
743                u"nick-v1".encode("utf-8"), "my_version", "oldest")
744         introducer.remote_publish(ann)
745         # the server should correlate the two
746         s = introducer.get_subscribers()
747         self.failUnlessEqual(len(s), 1)
748         sn, when, si, rref = s[0]
749         self.failUnlessIdentical(rref.original, subscriber)
750         self.failUnlessEqual(sn, "storage")
751
752         self.failUnlessEqual(si["version"], 0)
753         self.failUnlessEqual(si["oldest-supported"], "oldest")
754         # v1 announcements do not contain app-versions
755         self.failUnlessEqual(si["app-versions"], {})
756         self.failUnlessEqual(si["nickname"], u"nick-v1")
757         self.failUnlessEqual(si["my-version"], "my_version")
758
759         # a subscription that arrives after the stub_client announcement
760         # should be correlated too
761         subscriber2 = FakeRemoteReference()
762         introducer.remote_subscribe(subscriber2, "thing2")
763
764         s = introducer.get_subscribers()
765         subs = dict([(sn, (si,rref)) for sn, when, si, rref in s])
766         self.failUnlessEqual(len(subs), 2)
767         (si,rref) = subs["thing2"]
768         self.failUnlessIdentical(rref.original, subscriber2)
769         self.failUnlessEqual(si["version"], 0)
770         self.failUnlessEqual(si["oldest-supported"], "oldest")
771         # v1 announcements do not contain app-versions
772         self.failUnlessEqual(si["app-versions"], {})
773         self.failUnlessEqual(si["nickname"], u"nick-v1")
774         self.failUnlessEqual(si["my-version"], "my_version")
775
776 class Announcements(unittest.TestCase):
777     def test_client_v2_unsigned(self):
778         introducer = IntroducerService()
779         tub = introducer_furl = None
780         app_versions = {"whizzy": "fizzy"}
781         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
782                                      "my_version", "oldest", app_versions)
783         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
784         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
785         ann_s0 = make_ann_t(client_v2, furl1, None)
786         canary0 = Referenceable()
787         introducer.remote_publish_v2(ann_s0, canary0)
788         a = introducer.get_announcements()
789         self.failUnlessEqual(len(a), 1)
790         (index, (ann_s, canary, ann, when)) = a.items()[0]
791         self.failUnlessIdentical(canary, canary0)
792         self.failUnlessEqual(index, ("storage", None, tubid))
793         self.failUnlessEqual(ann["app-versions"], app_versions)
794         self.failUnlessEqual(ann["nickname"], u"nick-v2")
795         self.failUnlessEqual(ann["service-name"], "storage")
796         self.failUnlessEqual(ann["my-version"], "my_version")
797         self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
798
799     def test_client_v2_signed(self):
800         introducer = IntroducerService()
801         tub = introducer_furl = None
802         app_versions = {"whizzy": "fizzy"}
803         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
804                                      "my_version", "oldest", app_versions)
805         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
806         sk_s, vk_s = keyutil.make_keypair()
807         sk, _ignored = keyutil.parse_privkey(sk_s)
808         pks = keyutil.remove_prefix(vk_s, "pub-")
809         ann_t0 = make_ann_t(client_v2, furl1, sk)
810         canary0 = Referenceable()
811         introducer.remote_publish_v2(ann_t0, canary0)
812         a = introducer.get_announcements()
813         self.failUnlessEqual(len(a), 1)
814         (index, (ann_s, canary, ann, when)) = a.items()[0]
815         self.failUnlessIdentical(canary, canary0)
816         self.failUnlessEqual(index, ("storage", pks, None))
817         self.failUnlessEqual(ann["app-versions"], app_versions)
818         self.failUnlessEqual(ann["nickname"], u"nick-v2")
819         self.failUnlessEqual(ann["service-name"], "storage")
820         self.failUnlessEqual(ann["my-version"], "my_version")
821         self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
822
823     def test_client_v1(self):
824         introducer = IntroducerService()
825
826         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
827         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
828         ann = (furl1, "storage", "RIStorage",
829                u"nick-v1".encode("utf-8"), "my_version", "oldest")
830         introducer.remote_publish(ann)
831
832         a = introducer.get_announcements()
833         self.failUnlessEqual(len(a), 1)
834         (index, (ann_s, canary, ann, when)) = a.items()[0]
835         self.failUnlessEqual(canary, None)
836         self.failUnlessEqual(index, ("storage", None, tubid))
837         self.failUnlessEqual(ann["app-versions"], {})
838         self.failUnlessEqual(ann["nickname"], u"nick-v1".encode("utf-8"))
839         self.failUnlessEqual(ann["service-name"], "storage")
840         self.failUnlessEqual(ann["my-version"], "my_version")
841         self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
842
843
844 class TooNewServer(IntroducerService):
845     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
846                  { },
847                 "application-version": "greetings from the crazy future",
848                 }
849
850 class NonV1Server(SystemTestMixin, unittest.TestCase):
851     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
852     # protocol, it is supposed to provide a useful error instead of a weird
853     # exception.
854
855     def test_failure(self):
856         self.basedir = "introducer/NonV1Server/failure"
857         os.makedirs(self.basedir)
858         self.create_tub()
859         i = TooNewServer()
860         i.setServiceParent(self.parent)
861         self.introducer_furl = self.central_tub.registerReference(i)
862
863         tub = Tub()
864         tub.setOption("expose-remote-exception-types", False)
865         tub.setServiceParent(self.parent)
866         l = tub.listenOn("tcp:0")
867         portnum = l.getPortnum()
868         tub.setLocation("localhost:%d" % portnum)
869
870         c = IntroducerClient(tub, self.introducer_furl,
871                              u"nickname-client", "version", "oldest", {})
872         announcements = {}
873         def got(key_s, ann):
874             announcements[key_s] = ann
875         c.subscribe_to("storage", got)
876
877         c.setServiceParent(self.parent)
878
879         # now we wait for it to connect and notice the bad version
880
881         def _got_bad():
882             return bool(c._introducer_error) or bool(c._publisher)
883         d = self.poll(_got_bad)
884         def _done(res):
885             self.failUnless(c._introducer_error)
886             self.failUnless(c._introducer_error.check(InsufficientVersionError),
887                             c._introducer_error)
888         d.addCallback(_done)
889         return d
890
891 class DecodeFurl(unittest.TestCase):
892     def test_decode(self):
893         # make sure we have a working base64.b32decode. The one in
894         # python2.4.[01] was broken.
895         furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
896         m = re.match(r'pb://(\w+)@', furl)
897         assert m
898         nodeid = b32decode(m.group(1).upper())
899         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
900
901 class Signatures(unittest.TestCase):
902     def test_sign(self):
903         ann = {"key1": "value1"}
904         sk_s,vk_s = keyutil.make_keypair()
905         sk,ignored = keyutil.parse_privkey(sk_s)
906         ann_t = sign_to_foolscap(ann, sk)
907         (msg, sig, key) = ann_t
908         self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
909         self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
910         self.failUnless(sig.startswith("v0-"))
911         self.failUnless(key.startswith("v0-"))
912         (ann2,key2) = unsign_from_foolscap(ann_t)
913         self.failUnlessEqual(ann2, ann)
914         self.failUnlessEqual("pub-"+key2, vk_s)
915
916         # bad signature
917         bad_ann = {"key1": "value2"}
918         bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
919         self.failUnlessRaises(keyutil.BadSignatureError,
920                               unsign_from_foolscap, (bad_msg,sig,key))
921         # sneaky bad signature should be ignored
922         (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
923         self.failUnlessEqual(key2, None)
924         self.failUnlessEqual(ann2, bad_ann)
925
926         # unrecognized signatures
927         self.failUnlessRaises(UnknownKeyError,
928                               unsign_from_foolscap, (bad_msg,"v999-sig",key))
929         self.failUnlessRaises(UnknownKeyError,
930                               unsign_from_foolscap, (bad_msg,sig,"v999-key"))
931
932
933 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
934 # should leave the Reconnector in place, also if it receives
935 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
936 # should tear down the Reconnector and make a new one. This behavior used to
937 # live in the IntroducerClient, and thus used to be tested by test_introducer
938
939 # copying more tests from old branch:
940
941 #  then also add Upgrade test