]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
introducer: add get_nickname_for_peerid
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32decode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
8
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 # test compatibility with old introducer .tac files
15 from allmydata.introducer import IntroducerNode
16 from allmydata.introducer import old
17 from allmydata.util import testutil, idlib
18
19 class FakeNode(Referenceable):
20     pass
21
22 class LoggingMultiService(service.MultiService):
23     def log(self, msg, **kw):
24         log.msg(msg, **kw)
25
26 class Node(testutil.SignalMixin, unittest.TestCase):
27     def test_loadable(self):
28         basedir = "introducer.IntroducerNode.test_loadable"
29         os.mkdir(basedir)
30         q = IntroducerNode(basedir)
31         d = fireEventually(None)
32         d.addCallback(lambda res: q.startService())
33         d.addCallback(lambda res: q.when_tub_ready())
34         d.addCallback(lambda res: q.stopService())
35         d.addCallback(flushEventualQueue)
36         return d
37
38 class ServiceMixin:
39     def setUp(self):
40         self.parent = LoggingMultiService()
41         self.parent.startService()
42     def tearDown(self):
43         log.msg("TestIntroducer.tearDown")
44         d = defer.succeed(None)
45         d.addCallback(lambda res: self.parent.stopService())
46         d.addCallback(flushEventualQueue)
47         return d
48
49 class Introducer(ServiceMixin, unittest.TestCase, testutil.PollMixin):
50
51     def test_create(self):
52         ic = IntroducerClient(None, "introducer.furl", "my_nickname",
53                               "my_version", "oldest_version")
54
55     def test_listen(self):
56         i = IntroducerService()
57         i.setServiceParent(self.parent)
58
59     def test_duplicate(self):
60         i = IntroducerService()
61         self.failUnlessEqual(len(i.get_announcements()), 0)
62         self.failUnlessEqual(len(i.get_subscribers()), 0)
63         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
64         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
65         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
66         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
67         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
68         i.remote_publish(ann1)
69         self.failUnlessEqual(len(i.get_announcements()), 1)
70         self.failUnlessEqual(len(i.get_subscribers()), 0)
71         i.remote_publish(ann2)
72         self.failUnlessEqual(len(i.get_announcements()), 2)
73         self.failUnlessEqual(len(i.get_subscribers()), 0)
74         i.remote_publish(ann1b)
75         self.failUnlessEqual(len(i.get_announcements()), 2)
76         self.failUnlessEqual(len(i.get_subscribers()), 0)
77
78 class SystemTestMixin(ServiceMixin, testutil.PollMixin):
79
80     def setUp(self):
81         ServiceMixin.setUp(self)
82         self.central_tub = tub = Tub()
83         #tub.setOption("logLocalFailures", True)
84         #tub.setOption("logRemoteFailures", True)
85         tub.setServiceParent(self.parent)
86         l = tub.listenOn("tcp:0")
87         portnum = l.getPortnum()
88         tub.setLocation("localhost:%d" % portnum)
89
90 class SystemTest(SystemTestMixin, unittest.TestCase):
91
92     def test_system(self):
93         i = IntroducerService()
94         i.setServiceParent(self.parent)
95         self.introducer_furl = self.central_tub.registerReference(i)
96         return self.do_system_test()
97
98     def test_system_oldserver(self):
99         i = old.IntroducerService_V1()
100         i.setServiceParent(self.parent)
101         self.introducer_furl = self.central_tub.registerReference(i)
102         return self.do_system_test()
103
104     def do_system_test(self):
105
106         NUMCLIENTS = 5
107         # we have 5 clients who publish themselves, and an extra one does
108         # which not. When the connections are fully established, all six nodes
109         # should have 5 connections each.
110
111         clients = []
112         tubs = {}
113         for i in range(NUMCLIENTS+1):
114             tub = Tub()
115             #tub.setOption("logLocalFailures", True)
116             #tub.setOption("logRemoteFailures", True)
117             tub.setServiceParent(self.parent)
118             l = tub.listenOn("tcp:0")
119             portnum = l.getPortnum()
120             tub.setLocation("localhost:%d" % portnum)
121
122             n = FakeNode()
123             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
124             client_class = IntroducerClient
125             if i == 0:
126                 client_class = old.IntroducerClient_V1
127             c = client_class(tub, self.introducer_furl,
128                              "nickname-%d" % i, "version", "oldest")
129             if i < NUMCLIENTS:
130                 node_furl = tub.registerReference(n)
131                 c.publish(node_furl, "storage", "ri_name")
132             # the last one does not publish anything
133
134             c.subscribe_to("storage")
135
136             c.setServiceParent(self.parent)
137             clients.append(c)
138             tubs[c] = tub
139
140         def _wait_for_all_connections():
141             for c in clients:
142                 if len(c.get_all_connections()) < NUMCLIENTS:
143                     return False
144             return True
145         d = self.poll(_wait_for_all_connections)
146
147         def _check1(res):
148             log.msg("doing _check1")
149             for c in clients:
150                 self.failUnless(c.connected_to_introducer())
151                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
152                 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
153                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
154                                      NUMCLIENTS)
155                 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
156                 self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
157                                      "nickname-0")
158         d.addCallback(_check1)
159
160         origin_c = clients[0]
161         def _disconnect_somebody_else(res):
162             # now disconnect somebody's connection to someone else
163             current_counter = origin_c.counter
164             victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
165             log.msg(" disconnecting %s->%s" %
166                     (tubs[origin_c].tubID,
167                      idlib.shortnodeid_b2a(victim_nodeid)))
168             origin_c.debug_disconnect_from_peerid(victim_nodeid)
169             log.msg(" did disconnect")
170
171             # then wait until something changes, which ought to be them
172             # noticing the loss
173             def _compare():
174                 return current_counter != origin_c.counter
175             return self.poll(_compare)
176
177         d.addCallback(_disconnect_somebody_else)
178
179         # and wait for them to reconnect
180         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
181         def _check2(res):
182             log.msg("doing _check2")
183             for c in clients:
184                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
185         d.addCallback(_check2)
186
187         def _disconnect_yourself(res):
188             # now disconnect somebody's connection to themselves.
189             current_counter = origin_c.counter
190             victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
191             log.msg(" disconnecting %s->%s" %
192                     (tubs[origin_c].tubID,
193                      idlib.shortnodeid_b2a(victim_nodeid)))
194             origin_c.debug_disconnect_from_peerid(victim_nodeid)
195             log.msg(" did disconnect from self")
196
197             def _compare():
198                 return current_counter != origin_c.counter
199             return self.poll(_compare)
200         d.addCallback(_disconnect_yourself)
201
202         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
203         def _check3(res):
204             log.msg("doing _check3")
205             for c in clients:
206                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
207                                      NUMCLIENTS)
208         d.addCallback(_check3)
209         def _shutdown_introducer(res):
210             # now shut down the introducer. We do this by shutting down the
211             # tub it's using. Nobody's connections (to each other) should go
212             # down. All clients should notice the loss, and no other errors
213             # should occur.
214             log.msg("shutting down the introducer")
215             return self.central_tub.disownServiceParent()
216         d.addCallback(_shutdown_introducer)
217         def _wait_for_introducer_loss():
218             for c in clients:
219                 if c.connected_to_introducer():
220                     return False
221             return True
222         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
223
224         def _check4(res):
225             log.msg("doing _check4")
226             for c in clients:
227                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
228                                      NUMCLIENTS)
229                 self.failIf(c.connected_to_introducer())
230         d.addCallback(_check4)
231         return d