]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
841771efa3b9d901531ef08d41f94a00f5d79a94
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 import os, re
3 from base64 import b32decode
4 import simplejson
5
6 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.python import log
9
10 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.interfaces import InsufficientVersionError
13 from allmydata.introducer.client import IntroducerClient, \
14      WrapV2ClientInV1Interface
15 from allmydata.introducer.server import IntroducerService
16 from allmydata.introducer.common import get_tubid_string_from_ann, \
17      get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
18      UnknownKeyError
19 from allmydata.introducer import old
20 # test compatibility with old introducer .tac files
21 from allmydata.introducer import IntroducerNode
22 from allmydata.util import pollmixin, keyutil
23 import allmydata.test.common_util as testutil
24
25 class LoggingMultiService(service.MultiService):
26     def log(self, msg, **kw):
27         log.msg(msg, **kw)
28
29 class Node(testutil.SignalMixin, unittest.TestCase):
30     def test_loadable(self):
31         basedir = "introducer.IntroducerNode.test_loadable"
32         os.mkdir(basedir)
33         q = IntroducerNode(basedir)
34         d = fireEventually(None)
35         d.addCallback(lambda res: q.startService())
36         d.addCallback(lambda res: q.when_tub_ready())
37         d.addCallback(lambda res: q.stopService())
38         d.addCallback(flushEventualQueue)
39         return d
40
41 class ServiceMixin:
42     def setUp(self):
43         self.parent = LoggingMultiService()
44         self.parent.startService()
45     def tearDown(self):
46         log.msg("TestIntroducer.tearDown")
47         d = defer.succeed(None)
48         d.addCallback(lambda res: self.parent.stopService())
49         d.addCallback(flushEventualQueue)
50         return d
51
52 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
53
54     def test_create(self):
55         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
56                               "my_version", "oldest_version", {})
57         self.failUnless(isinstance(ic, IntroducerClient))
58
59     def test_listen(self):
60         i = IntroducerService()
61         i.setServiceParent(self.parent)
62
63     def test_duplicate_publish(self):
64         i = IntroducerService()
65         self.failUnlessEqual(len(i.get_announcements()), 0)
66         self.failUnlessEqual(len(i.get_subscribers()), 0)
67         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
68         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
69         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
70         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
71         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
72         i.remote_publish(ann1)
73         self.failUnlessEqual(len(i.get_announcements()), 1)
74         self.failUnlessEqual(len(i.get_subscribers()), 0)
75         i.remote_publish(ann2)
76         self.failUnlessEqual(len(i.get_announcements()), 2)
77         self.failUnlessEqual(len(i.get_subscribers()), 0)
78         i.remote_publish(ann1b)
79         self.failUnlessEqual(len(i.get_announcements()), 2)
80         self.failUnlessEqual(len(i.get_subscribers()), 0)
81
82     def test_id_collision(self):
83         # test replacement case where tubid equals a keyid (one should
84         # not replace the other)
85         i = IntroducerService()
86         ic = IntroducerClient(None,
87                               "introducer.furl", u"my_nickname",
88                               "my_version", "oldest_version", {})
89         sk_s, vk_s = keyutil.make_keypair()
90         sk, _ignored = keyutil.parse_privkey(sk_s)
91         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
92         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
93         ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
94         i.remote_publish_v2(ann_t, Referenceable())
95         announcements = i.get_announcements()
96         self.failUnlessEqual(len(announcements), 1)
97         key1 = ("storage", "v0-"+keyid, None)
98         self.failUnless(key1 in announcements)
99         (ign, ign, ann1_out, ign) = announcements[key1]
100         self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
101
102         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
103         ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
104         i.remote_publish(ann2)
105         self.failUnlessEqual(len(announcements), 2)
106         key2 = ("storage", None, keyid)
107         self.failUnless(key2 in announcements)
108         (ign, ign, ann2_out, ign) = announcements[key2]
109         self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
110
111
112 def make_ann(furl):
113     ann = { "anonymous-storage-FURL": furl,
114             "permutation-seed-base32": get_tubid_string(furl) }
115     return ann
116
117 def make_ann_t(ic, furl, privkey):
118     return ic.create_announcement("storage", make_ann(furl), privkey)
119
120 class Client(unittest.TestCase):
121     def test_duplicate_receive_v1(self):
122         ic = IntroducerClient(None,
123                               "introducer.furl", u"my_nickname",
124                               "my_version", "oldest_version", {})
125         announcements = []
126         ic.subscribe_to("storage",
127                         lambda key_s,ann: announcements.append(ann))
128         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
129         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
130         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
131         ca = WrapV2ClientInV1Interface(ic)
132
133         ca.remote_announce([ann1])
134         d = fireEventually()
135         def _then(ign):
136             self.failUnlessEqual(len(announcements), 1)
137             self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
138             self.failUnlessEqual(announcements[0]["my-version"], "ver23")
139             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
140             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
141             self.failUnlessEqual(ic._debug_counts["update"], 0)
142             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
143             # now send a duplicate announcement: this should not notify clients
144             ca.remote_announce([ann1])
145             return fireEventually()
146         d.addCallback(_then)
147         def _then2(ign):
148             self.failUnlessEqual(len(announcements), 1)
149             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
150             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
151             self.failUnlessEqual(ic._debug_counts["update"], 0)
152             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
153             # and a replacement announcement: same FURL, new other stuff.
154             # Clients should be notified.
155             ca.remote_announce([ann1b])
156             return fireEventually()
157         d.addCallback(_then2)
158         def _then3(ign):
159             self.failUnlessEqual(len(announcements), 2)
160             self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
161             self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
162             self.failUnlessEqual(ic._debug_counts["update"], 1)
163             self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
164             # test that the other stuff changed
165             self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
166             self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
167         d.addCallback(_then3)
168         return d
169
170     def test_duplicate_receive_v2(self):
171         ic1 = IntroducerClient(None,
172                                "introducer.furl", u"my_nickname",
173                                "ver23", "oldest_version", {})
174         # we use a second client just to create a different-looking
175         # announcement
176         ic2 = IntroducerClient(None,
177                                "introducer.furl", u"my_nickname",
178                                "ver24","oldest_version",{})
179         announcements = []
180         def _received(key_s, ann):
181             announcements.append( (key_s, ann) )
182         ic1.subscribe_to("storage", _received)
183         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
184         furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
185         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
186
187         privkey_s, pubkey_vs = keyutil.make_keypair()
188         privkey, _ignored = keyutil.parse_privkey(privkey_s)
189         pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
190
191         # ann1: ic1, furl1
192         # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
193         # ann1b: ic2, furl1
194         # ann2: ic2, furl2
195
196         self.ann1 = make_ann_t(ic1, furl1, privkey)
197         self.ann1a = make_ann_t(ic1, furl1a, privkey)
198         self.ann1b = make_ann_t(ic2, furl1, privkey)
199         self.ann2 = make_ann_t(ic2, furl2, privkey)
200
201         ic1.remote_announce_v2([self.ann1]) # queues eventual-send
202         d = fireEventually()
203         def _then1(ign):
204             self.failUnlessEqual(len(announcements), 1)
205             key_s,ann = announcements[0]
206             self.failUnlessEqual(key_s, pubkey_s)
207             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
208             self.failUnlessEqual(ann["my-version"], "ver23")
209         d.addCallback(_then1)
210
211         # now send a duplicate announcement. This should not fire the
212         # subscriber
213         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
214         d.addCallback(fireEventually)
215         def _then2(ign):
216             self.failUnlessEqual(len(announcements), 1)
217         d.addCallback(_then2)
218
219         # and a replacement announcement: same FURL, new other stuff. The
220         # subscriber *should* be fired.
221         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
222         d.addCallback(fireEventually)
223         def _then3(ign):
224             self.failUnlessEqual(len(announcements), 2)
225             key_s,ann = announcements[-1]
226             self.failUnlessEqual(key_s, pubkey_s)
227             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
228             self.failUnlessEqual(ann["my-version"], "ver24")
229         d.addCallback(_then3)
230
231         # and a replacement announcement with a different FURL (it uses
232         # different connection hints)
233         d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
234         d.addCallback(fireEventually)
235         def _then4(ign):
236             self.failUnlessEqual(len(announcements), 3)
237             key_s,ann = announcements[-1]
238             self.failUnlessEqual(key_s, pubkey_s)
239             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
240             self.failUnlessEqual(ann["my-version"], "ver23")
241         d.addCallback(_then4)
242
243         # now add a new subscription, which should be called with the
244         # backlog. The introducer only records one announcement per index, so
245         # the backlog will only have the latest message.
246         announcements2 = []
247         def _received2(key_s, ann):
248             announcements2.append( (key_s, ann) )
249         d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
250         d.addCallback(fireEventually)
251         def _then5(ign):
252             self.failUnlessEqual(len(announcements2), 1)
253             key_s,ann = announcements2[-1]
254             self.failUnlessEqual(key_s, pubkey_s)
255             self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
256             self.failUnlessEqual(ann["my-version"], "ver23")
257         d.addCallback(_then5)
258         return d
259
260     def test_id_collision(self):
261         # test replacement case where tubid equals a keyid (one should
262         # not replace the other)
263         ic = IntroducerClient(None,
264                               "introducer.furl", u"my_nickname",
265                               "my_version", "oldest_version", {})
266         announcements = []
267         ic.subscribe_to("storage",
268                         lambda key_s,ann: announcements.append(ann))
269         sk_s, vk_s = keyutil.make_keypair()
270         sk, _ignored = keyutil.parse_privkey(sk_s)
271         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
272         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
273         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
274         ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
275         ic.remote_announce_v2([ann_t])
276         d = fireEventually()
277         def _then(ign):
278             # first announcement has been processed
279             self.failUnlessEqual(len(announcements), 1)
280             self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
281                                  furl1)
282             # now submit a second one, with a tubid that happens to look just
283             # like the pubkey-based serverid we just processed. They should
284             # not overlap.
285             ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
286             ca = WrapV2ClientInV1Interface(ic)
287             ca.remote_announce([ann2])
288             return fireEventually()
289         d.addCallback(_then)
290         def _then2(ign):
291             # if they overlapped, the second announcement would be ignored
292             self.failUnlessEqual(len(announcements), 2)
293             self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
294                                  furl2)
295         d.addCallback(_then2)
296         return d
297
298
299 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
300
301     def create_tub(self, portnum=0):
302         tubfile = os.path.join(self.basedir, "tub.pem")
303         self.central_tub = tub = Tub(certFile=tubfile)
304         #tub.setOption("logLocalFailures", True)
305         #tub.setOption("logRemoteFailures", True)
306         tub.setOption("expose-remote-exception-types", False)
307         tub.setServiceParent(self.parent)
308         l = tub.listenOn("tcp:%d" % portnum)
309         self.central_portnum = l.getPortnum()
310         if portnum != 0:
311             assert self.central_portnum == portnum
312         tub.setLocation("localhost:%d" % self.central_portnum)
313
314 class Queue(SystemTestMixin, unittest.TestCase):
315     def test_queue_until_connected(self):
316         self.basedir = "introducer/QueueUntilConnected/queued"
317         os.makedirs(self.basedir)
318         self.create_tub()
319         introducer = IntroducerService()
320         introducer.setServiceParent(self.parent)
321         iff = os.path.join(self.basedir, "introducer.furl")
322         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
323         tub2 = Tub()
324         tub2.setServiceParent(self.parent)
325         c = IntroducerClient(tub2, ifurl,
326                              u"nickname", "version", "oldest", {})
327         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
328         sk_s, vk_s = keyutil.make_keypair()
329         sk, _ignored = keyutil.parse_privkey(sk_s)
330
331         d = introducer.disownServiceParent()
332         def _offline(ign):
333             # now that the introducer server is offline, create a client and
334             # publish some messages
335             c.setServiceParent(self.parent) # this starts the reconnector
336             c.publish("storage", make_ann(furl1), sk)
337
338             introducer.setServiceParent(self.parent) # restart the server
339             # now wait for the messages to be delivered
340             def _got_announcement():
341                 return bool(introducer.get_announcements())
342             return self.poll(_got_announcement)
343         d.addCallback(_offline)
344         def _done(ign):
345             v = list(introducer.get_announcements().values())[0]
346             (ign, ign, ann1_out, ign) = v
347             self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
348         d.addCallback(_done)
349
350         # now let the ack get back
351         def _wait_until_idle(ign):
352             def _idle():
353                 if c._debug_outstanding:
354                     return False
355                 if introducer._debug_outstanding:
356                     return False
357                 return True
358             return self.poll(_idle)
359         d.addCallback(_wait_until_idle)
360         return d
361
362
363 V1 = "v1"; V2 = "v2"
364 class SystemTest(SystemTestMixin, unittest.TestCase):
365
366     def do_system_test(self, server_version):
367         self.create_tub()
368         if server_version == V1:
369             introducer = old.IntroducerService_v1()
370         else:
371             introducer = IntroducerService()
372         introducer.setServiceParent(self.parent)
373         iff = os.path.join(self.basedir, "introducer.furl")
374         tub = self.central_tub
375         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
376         self.introducer_furl = ifurl
377
378         # we have 5 clients who publish themselves as storage servers, and a
379         # sixth which does which not. All 6 clients subscriber to hear about
380         # storage. When the connections are fully established, all six nodes
381         # should have 5 connections each.
382         NUM_STORAGE = 5
383         NUM_CLIENTS = 6
384
385         clients = []
386         tubs = {}
387         received_announcements = {}
388         subscribing_clients = []
389         publishing_clients = []
390         self.the_introducer = introducer
391         privkeys = {}
392         expected_announcements = [0 for c in range(NUM_CLIENTS)]
393
394         for i in range(NUM_CLIENTS):
395             tub = Tub()
396             #tub.setOption("logLocalFailures", True)
397             #tub.setOption("logRemoteFailures", True)
398             tub.setOption("expose-remote-exception-types", False)
399             tub.setServiceParent(self.parent)
400             l = tub.listenOn("tcp:0")
401             portnum = l.getPortnum()
402             tub.setLocation("localhost:%d" % portnum)
403
404             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
405             if i == 0:
406                 c = old.IntroducerClient_v1(tub, self.introducer_furl,
407                                             u"nickname-%d" % i,
408                                             "version", "oldest")
409             else:
410                 c = IntroducerClient(tub, self.introducer_furl,
411                                      u"nickname-%d" % i,
412                                      "version", "oldest",
413                                      {"component": "component-v1"})
414             received_announcements[c] = {}
415             def got(key_s_or_tubid, ann, announcements, i):
416                 if i == 0:
417                     index = get_tubid_string_from_ann(ann)
418                 else:
419                     index = key_s_or_tubid or get_tubid_string_from_ann(ann)
420                 announcements[index] = ann
421             c.subscribe_to("storage", got, received_announcements[c], i)
422             subscribing_clients.append(c)
423             expected_announcements[i] += 1 # all expect a 'storage' announcement
424
425             node_furl = tub.registerReference(Referenceable())
426             if i < NUM_STORAGE:
427                 if i == 0:
428                     c.publish(node_furl, "storage", "ri_name")
429                 elif i == 1:
430                     # sign the announcement
431                     privkey_s, pubkey_s = keyutil.make_keypair()
432                     privkey, _ignored = keyutil.parse_privkey(privkey_s)
433                     privkeys[c] = privkey
434                     c.publish("storage", make_ann(node_furl), privkey)
435                 else:
436                     c.publish("storage", make_ann(node_furl))
437                 publishing_clients.append(c)
438             else:
439                 # the last one does not publish anything
440                 pass
441
442             if i == 0:
443                 # users of the V1 client were required to publish a
444                 # 'stub_client' record (somewhat after they published the
445                 # 'storage' record), so the introducer could see their
446                 # version. Match that behavior.
447                 c.publish(node_furl, "stub_client", "stub_ri_name")
448
449             if i == 2:
450                 # also publish something that nobody cares about
451                 boring_furl = tub.registerReference(Referenceable())
452                 c.publish("boring", make_ann(boring_furl))
453
454             c.setServiceParent(self.parent)
455             clients.append(c)
456             tubs[c] = tub
457
458
459         def _wait_for_connected(ign):
460             def _connected():
461                 for c in clients:
462                     if not c.connected_to_introducer():
463                         return False
464                 return True
465             return self.poll(_connected)
466
467         # we watch the clients to determine when the system has settled down.
468         # Then we can look inside the server to assert things about its
469         # state.
470
471         def _wait_for_expected_announcements(ign):
472             def _got_expected_announcements():
473                 for i,c in enumerate(subscribing_clients):
474                     if len(received_announcements[c]) < expected_announcements[i]:
475                         return False
476                 return True
477             return self.poll(_got_expected_announcements)
478
479         # before shutting down any Tub, we'd like to know that there are no
480         # messages outstanding
481
482         def _wait_until_idle(ign):
483             def _idle():
484                 for c in subscribing_clients + publishing_clients:
485                     if c._debug_outstanding:
486                         return False
487                 if self.the_introducer._debug_outstanding:
488                     return False
489                 return True
490             return self.poll(_idle)
491
492         d = defer.succeed(None)
493         d.addCallback(_wait_for_connected)
494         d.addCallback(_wait_for_expected_announcements)
495         d.addCallback(_wait_until_idle)
496
497         def _check1(res):
498             log.msg("doing _check1")
499             dc = self.the_introducer._debug_counts
500             if server_version == V1:
501                 # each storage server publishes a record, and (after its
502                 # 'subscribe' has been ACKed) also publishes a "stub_client".
503                 # The non-storage client (which subscribes) also publishes a
504                 # stub_client. There is also one "boring" service. The number
505                 # of messages is higher, because the stub_clients aren't
506                 # published until after we get the 'subscribe' ack (since we
507                 # don't realize that we're dealing with a v1 server [which
508                 # needs stub_clients] until then), and the act of publishing
509                 # the stub_client causes us to re-send all previous
510                 # announcements.
511                 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
512                                      NUM_STORAGE + NUM_CLIENTS + 1)
513             else:
514                 # each storage server publishes a record. There is also one
515                 # "stub_client" and one "boring"
516                 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
517                 self.failUnlessEqual(dc["inbound_duplicate"], 0)
518             self.failUnlessEqual(dc["inbound_update"], 0)
519             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
520             # the number of outbound messages is tricky.. I think it depends
521             # upon a race between the publish and the subscribe messages.
522             self.failUnless(dc["outbound_message"] > 0)
523             # each client subscribes to "storage", and each server publishes
524             self.failUnlessEqual(dc["outbound_announcements"],
525                                  NUM_STORAGE*NUM_CLIENTS)
526
527             for c in subscribing_clients:
528                 cdc = c._debug_counts
529                 self.failUnless(cdc["inbound_message"])
530                 self.failUnlessEqual(cdc["inbound_announcement"],
531                                      NUM_STORAGE)
532                 self.failUnlessEqual(cdc["wrong_service"], 0)
533                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
534                 self.failUnlessEqual(cdc["update"], 0)
535                 self.failUnlessEqual(cdc["new_announcement"],
536                                      NUM_STORAGE)
537                 anns = received_announcements[c]
538                 self.failUnlessEqual(len(anns), NUM_STORAGE)
539
540                 nodeid0 = tubs[clients[0]].tubID
541                 ann = anns[nodeid0]
542                 nick = ann["nickname"]
543                 self.failUnlessEqual(type(nick), unicode)
544                 self.failUnlessEqual(nick, u"nickname-0")
545             if server_version == V1:
546                 for c in publishing_clients:
547                     cdc = c._debug_counts
548                     expected = 1 # storage
549                     if c is clients[2]:
550                         expected += 1 # boring
551                     if c is not clients[0]:
552                         # the v2 client tries to call publish_v2, which fails
553                         # because the server is v1. It then re-sends
554                         # everything it has so far, plus a stub_client record
555                         expected = 2*expected + 1
556                     if c is clients[0]:
557                         # we always tell v1 client to send stub_client
558                         expected += 1
559                     self.failUnlessEqual(cdc["outbound_message"], expected)
560             else:
561                 for c in publishing_clients:
562                     cdc = c._debug_counts
563                     expected = 1
564                     if c in [clients[0], # stub_client
565                              clients[2], # boring
566                              ]:
567                         expected = 2
568                     self.failUnlessEqual(cdc["outbound_message"], expected)
569             log.msg("_check1 done")
570         d.addCallback(_check1)
571
572         # force an introducer reconnect, by shutting down the Tub it's using
573         # and starting a new Tub (with the old introducer). Everybody should
574         # reconnect and republish, but the introducer should ignore the
575         # republishes as duplicates. However, because the server doesn't know
576         # what each client does and does not know, it will send them a copy
577         # of the current announcement table anyway.
578
579         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
580         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
581
582         def _wait_for_introducer_loss(ign):
583             def _introducer_lost():
584                 for c in clients:
585                     if c.connected_to_introducer():
586                         return False
587                 return True
588             return self.poll(_introducer_lost)
589         d.addCallback(_wait_for_introducer_loss)
590
591         def _restart_introducer_tub(_ign):
592             log.msg("restarting introducer's Tub")
593             # reset counters
594             for i in range(NUM_CLIENTS):
595                 c = subscribing_clients[i]
596                 for k in c._debug_counts:
597                     c._debug_counts[k] = 0
598             for k in self.the_introducer._debug_counts:
599                 self.the_introducer._debug_counts[k] = 0
600             expected_announcements[i] += 1 # new 'storage' for everyone
601             self.create_tub(self.central_portnum)
602             newfurl = self.central_tub.registerReference(self.the_introducer,
603                                                          furlFile=iff)
604             assert newfurl == self.introducer_furl
605         d.addCallback(_restart_introducer_tub)
606
607         d.addCallback(_wait_for_connected)
608         d.addCallback(_wait_for_expected_announcements)
609         d.addCallback(_wait_until_idle)
610         d.addCallback(lambda _ign: log.msg(" reconnected"))
611
612         # TODO: publish something while the introducer is offline, then
613         # confirm it gets delivered when the connection is reestablished
614         def _check2(res):
615             log.msg("doing _check2")
616             # assert that the introducer sent out new messages, one per
617             # subscriber
618             dc = self.the_introducer._debug_counts
619             self.failUnlessEqual(dc["outbound_announcements"],
620                                  NUM_STORAGE*NUM_CLIENTS)
621             self.failUnless(dc["outbound_message"] > 0)
622             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
623             for c in subscribing_clients:
624                 cdc = c._debug_counts
625                 self.failUnlessEqual(cdc["inbound_message"], 1)
626                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
627                 self.failUnlessEqual(cdc["new_announcement"], 0)
628                 self.failUnlessEqual(cdc["wrong_service"], 0)
629                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
630         d.addCallback(_check2)
631
632         # Then force an introducer restart, by shutting down the Tub,
633         # destroying the old introducer, and starting a new Tub+Introducer.
634         # Everybody should reconnect and republish, and the (new) introducer
635         # will distribute the new announcements, but the clients should
636         # ignore the republishes as duplicates.
637
638         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
639         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
640         d.addCallback(_wait_for_introducer_loss)
641         d.addCallback(lambda _ign: log.msg("introducer lost"))
642
643         def _restart_introducer(_ign):
644             log.msg("restarting introducer")
645             self.create_tub(self.central_portnum)
646             # reset counters
647             for i in range(NUM_CLIENTS):
648                 c = subscribing_clients[i]
649                 for k in c._debug_counts:
650                     c._debug_counts[k] = 0
651             expected_announcements[i] += 1 # new 'storage' for everyone
652             if server_version == V1:
653                 introducer = old.IntroducerService_v1()
654             else:
655                 introducer = IntroducerService()
656             self.the_introducer = introducer
657             newfurl = self.central_tub.registerReference(self.the_introducer,
658                                                          furlFile=iff)
659             assert newfurl == self.introducer_furl
660         d.addCallback(_restart_introducer)
661
662         d.addCallback(_wait_for_connected)
663         d.addCallback(_wait_for_expected_announcements)
664         d.addCallback(_wait_until_idle)
665
666         def _check3(res):
667             log.msg("doing _check3")
668             dc = self.the_introducer._debug_counts
669             self.failUnlessEqual(dc["outbound_announcements"],
670                                  NUM_STORAGE*NUM_CLIENTS)
671             self.failUnless(dc["outbound_message"] > 0)
672             self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
673             for c in subscribing_clients:
674                 cdc = c._debug_counts
675                 self.failUnless(cdc["inbound_message"] > 0)
676                 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
677                 self.failUnlessEqual(cdc["new_announcement"], 0)
678                 self.failUnlessEqual(cdc["wrong_service"], 0)
679                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
680
681         d.addCallback(_check3)
682         return d
683
684
685     def test_system_v2_server(self):
686         self.basedir = "introducer/SystemTest/system_v2_server"
687         os.makedirs(self.basedir)
688         return self.do_system_test(V2)
689     test_system_v2_server.timeout = 480
690     # occasionally takes longer than 350s on "draco"
691
692     def test_system_v1_server(self):
693         self.basedir = "introducer/SystemTest/system_v1_server"
694         os.makedirs(self.basedir)
695         return self.do_system_test(V1)
696     test_system_v1_server.timeout = 480
697     # occasionally takes longer than 350s on "draco"
698
699 class FakeRemoteReference:
700     def notifyOnDisconnect(self, *args, **kwargs): pass
701     def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
702
703 class ClientInfo(unittest.TestCase):
704     def test_client_v2(self):
705         introducer = IntroducerService()
706         tub = introducer_furl = None
707         app_versions = {"whizzy": "fizzy"}
708         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
709                                      "my_version", "oldest", app_versions)
710         #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
711         #ann_s = make_ann_t(client_v2, furl1, None)
712         #introducer.remote_publish_v2(ann_s, Referenceable())
713         subscriber = FakeRemoteReference()
714         introducer.remote_subscribe_v2(subscriber, "storage",
715                                        client_v2._my_subscriber_info)
716         s = introducer.get_subscribers()
717         self.failUnlessEqual(len(s), 1)
718         sn, when, si, rref = s[0]
719         self.failUnlessIdentical(rref, subscriber)
720         self.failUnlessEqual(sn, "storage")
721         self.failUnlessEqual(si["version"], 0)
722         self.failUnlessEqual(si["oldest-supported"], "oldest")
723         self.failUnlessEqual(si["app-versions"], app_versions)
724         self.failUnlessEqual(si["nickname"], u"nick-v2")
725         self.failUnlessEqual(si["my-version"], "my_version")
726
727     def test_client_v1(self):
728         introducer = IntroducerService()
729         subscriber = FakeRemoteReference()
730         introducer.remote_subscribe(subscriber, "storage")
731         # the v1 subscribe interface had no subscriber_info: that was usually
732         # sent in a separate stub_client pseudo-announcement
733         s = introducer.get_subscribers()
734         self.failUnlessEqual(len(s), 1)
735         sn, when, si, rref = s[0]
736         # rref will be a WrapV1SubscriberInV2Interface around the real
737         # subscriber
738         self.failUnlessIdentical(rref.original, subscriber)
739         self.failUnlessEqual(si, None) # not known yet
740         self.failUnlessEqual(sn, "storage")
741
742         # now submit the stub_client announcement
743         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
744         ann = (furl1, "stub_client", "RIStubClient",
745                u"nick-v1".encode("utf-8"), "my_version", "oldest")
746         introducer.remote_publish(ann)
747         # the server should correlate the two
748         s = introducer.get_subscribers()
749         self.failUnlessEqual(len(s), 1)
750         sn, when, si, rref = s[0]
751         self.failUnlessIdentical(rref.original, subscriber)
752         self.failUnlessEqual(sn, "storage")
753
754         self.failUnlessEqual(si["version"], 0)
755         self.failUnlessEqual(si["oldest-supported"], "oldest")
756         # v1 announcements do not contain app-versions
757         self.failUnlessEqual(si["app-versions"], {})
758         self.failUnlessEqual(si["nickname"], u"nick-v1")
759         self.failUnlessEqual(si["my-version"], "my_version")
760
761         # a subscription that arrives after the stub_client announcement
762         # should be correlated too
763         subscriber2 = FakeRemoteReference()
764         introducer.remote_subscribe(subscriber2, "thing2")
765
766         s = introducer.get_subscribers()
767         subs = dict([(sn, (si,rref)) for sn, when, si, rref in s])
768         self.failUnlessEqual(len(subs), 2)
769         (si,rref) = subs["thing2"]
770         self.failUnlessIdentical(rref.original, subscriber2)
771         self.failUnlessEqual(si["version"], 0)
772         self.failUnlessEqual(si["oldest-supported"], "oldest")
773         # v1 announcements do not contain app-versions
774         self.failUnlessEqual(si["app-versions"], {})
775         self.failUnlessEqual(si["nickname"], u"nick-v1")
776         self.failUnlessEqual(si["my-version"], "my_version")
777
778 class Announcements(unittest.TestCase):
779     def test_client_v2_unsigned(self):
780         introducer = IntroducerService()
781         tub = introducer_furl = None
782         app_versions = {"whizzy": "fizzy"}
783         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
784                                      "my_version", "oldest", app_versions)
785         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
786         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
787         ann_s0 = make_ann_t(client_v2, furl1, None)
788         canary0 = Referenceable()
789         introducer.remote_publish_v2(ann_s0, canary0)
790         a = introducer.get_announcements()
791         self.failUnlessEqual(len(a), 1)
792         (index, (ann_s, canary, ann, when)) = a.items()[0]
793         self.failUnlessIdentical(canary, canary0)
794         self.failUnlessEqual(index, ("storage", None, tubid))
795         self.failUnlessEqual(ann["app-versions"], app_versions)
796         self.failUnlessEqual(ann["nickname"], u"nick-v2")
797         self.failUnlessEqual(ann["service-name"], "storage")
798         self.failUnlessEqual(ann["my-version"], "my_version")
799         self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
800
801     def test_client_v2_signed(self):
802         introducer = IntroducerService()
803         tub = introducer_furl = None
804         app_versions = {"whizzy": "fizzy"}
805         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
806                                      "my_version", "oldest", app_versions)
807         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
808         sk_s, vk_s = keyutil.make_keypair()
809         sk, _ignored = keyutil.parse_privkey(sk_s)
810         pks = keyutil.remove_prefix(vk_s, "pub-")
811         ann_t0 = make_ann_t(client_v2, furl1, sk)
812         canary0 = Referenceable()
813         introducer.remote_publish_v2(ann_t0, canary0)
814         a = introducer.get_announcements()
815         self.failUnlessEqual(len(a), 1)
816         (index, (ann_s, canary, ann, when)) = a.items()[0]
817         self.failUnlessIdentical(canary, canary0)
818         self.failUnlessEqual(index, ("storage", pks, None))
819         self.failUnlessEqual(ann["app-versions"], app_versions)
820         self.failUnlessEqual(ann["nickname"], u"nick-v2")
821         self.failUnlessEqual(ann["service-name"], "storage")
822         self.failUnlessEqual(ann["my-version"], "my_version")
823         self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
824
825     def test_client_v1(self):
826         introducer = IntroducerService()
827
828         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
829         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
830         ann = (furl1, "storage", "RIStorage",
831                u"nick-v1".encode("utf-8"), "my_version", "oldest")
832         introducer.remote_publish(ann)
833
834         a = introducer.get_announcements()
835         self.failUnlessEqual(len(a), 1)
836         (index, (ann_s, canary, ann, when)) = a.items()[0]
837         self.failUnlessEqual(canary, None)
838         self.failUnlessEqual(index, ("storage", None, tubid))
839         self.failUnlessEqual(ann["app-versions"], {})
840         self.failUnlessEqual(ann["nickname"], u"nick-v1".encode("utf-8"))
841         self.failUnlessEqual(ann["service-name"], "storage")
842         self.failUnlessEqual(ann["my-version"], "my_version")
843         self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
844
845
846 class TooNewServer(IntroducerService):
847     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
848                  { },
849                 "application-version": "greetings from the crazy future",
850                 }
851
852 class NonV1Server(SystemTestMixin, unittest.TestCase):
853     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
854     # protocol, it is supposed to provide a useful error instead of a weird
855     # exception.
856
857     def test_failure(self):
858         self.basedir = "introducer/NonV1Server/failure"
859         os.makedirs(self.basedir)
860         self.create_tub()
861         i = TooNewServer()
862         i.setServiceParent(self.parent)
863         self.introducer_furl = self.central_tub.registerReference(i)
864
865         tub = Tub()
866         tub.setOption("expose-remote-exception-types", False)
867         tub.setServiceParent(self.parent)
868         l = tub.listenOn("tcp:0")
869         portnum = l.getPortnum()
870         tub.setLocation("localhost:%d" % portnum)
871
872         c = IntroducerClient(tub, self.introducer_furl,
873                              u"nickname-client", "version", "oldest", {})
874         announcements = {}
875         def got(key_s, ann):
876             announcements[key_s] = ann
877         c.subscribe_to("storage", got)
878
879         c.setServiceParent(self.parent)
880
881         # now we wait for it to connect and notice the bad version
882
883         def _got_bad():
884             return bool(c._introducer_error) or bool(c._publisher)
885         d = self.poll(_got_bad)
886         def _done(res):
887             self.failUnless(c._introducer_error)
888             self.failUnless(c._introducer_error.check(InsufficientVersionError),
889                             c._introducer_error)
890         d.addCallback(_done)
891         return d
892
893 class DecodeFurl(unittest.TestCase):
894     def test_decode(self):
895         # make sure we have a working base64.b32decode. The one in
896         # python2.4.[01] was broken.
897         furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
898         m = re.match(r'pb://(\w+)@', furl)
899         assert m
900         nodeid = b32decode(m.group(1).upper())
901         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
902
903 class Signatures(unittest.TestCase):
904     def test_sign(self):
905         ann = {"key1": "value1"}
906         sk_s,vk_s = keyutil.make_keypair()
907         sk,ignored = keyutil.parse_privkey(sk_s)
908         ann_t = sign_to_foolscap(ann, sk)
909         (msg, sig, key) = ann_t
910         self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
911         self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
912         self.failUnless(sig.startswith("v0-"))
913         self.failUnless(key.startswith("v0-"))
914         (ann2,key2) = unsign_from_foolscap(ann_t)
915         self.failUnlessEqual(ann2, ann)
916         self.failUnlessEqual("pub-"+key2, vk_s)
917
918         # bad signature
919         bad_ann = {"key1": "value2"}
920         bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
921         self.failUnlessRaises(keyutil.BadSignatureError,
922                               unsign_from_foolscap, (bad_msg,sig,key))
923         # sneaky bad signature should be ignored
924         (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
925         self.failUnlessEqual(key2, None)
926         self.failUnlessEqual(ann2, bad_ann)
927
928         # unrecognized signatures
929         self.failUnlessRaises(UnknownKeyError,
930                               unsign_from_foolscap, (bad_msg,"v999-sig",key))
931         self.failUnlessRaises(UnknownKeyError,
932                               unsign_from_foolscap, (bad_msg,sig,"v999-key"))
933
934
935 # add tests of StorageFarmBroker: if it receives duplicate announcements, it
936 # should leave the Reconnector in place, also if it receives
937 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
938 # should tear down the Reconnector and make a new one. This behavior used to
939 # live in the IntroducerClient, and thus used to be tested by test_introducer
940
941 # copying more tests from old branch:
942
943 #  then also add Upgrade test