1 from base64 import b32decode
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 # test compatibility with old introducer .tac files
15 from allmydata.introducer import IntroducerNode
16 from allmydata.introducer import old
17 from allmydata.util import idlib, pollmixin
18 import common_util as testutil
20 class FakeNode(Referenceable):
23 class LoggingMultiService(service.MultiService):
24 def log(self, msg, **kw):
27 class Node(testutil.SignalMixin, unittest.TestCase):
28 def test_loadable(self):
29 basedir = "introducer.IntroducerNode.test_loadable"
31 q = IntroducerNode(basedir)
32 d = fireEventually(None)
33 d.addCallback(lambda res: q.startService())
34 d.addCallback(lambda res: q.when_tub_ready())
35 d.addCallback(lambda res: q.stopService())
36 d.addCallback(flushEventualQueue)
41 self.parent = LoggingMultiService()
42 self.parent.startService()
44 log.msg("TestIntroducer.tearDown")
45 d = defer.succeed(None)
46 d.addCallback(lambda res: self.parent.stopService())
47 d.addCallback(flushEventualQueue)
50 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
52 def test_create(self):
53 ic = IntroducerClient(None, "introducer.furl", "my_nickname",
54 "my_version", "oldest_version")
56 def test_listen(self):
57 i = IntroducerService()
58 i.setServiceParent(self.parent)
60 def test_duplicate(self):
61 i = IntroducerService()
62 self.failUnlessEqual(len(i.get_announcements()), 0)
63 self.failUnlessEqual(len(i.get_subscribers()), 0)
64 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
65 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
66 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
67 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
68 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
69 i.remote_publish(ann1)
70 self.failUnlessEqual(len(i.get_announcements()), 1)
71 self.failUnlessEqual(len(i.get_subscribers()), 0)
72 i.remote_publish(ann2)
73 self.failUnlessEqual(len(i.get_announcements()), 2)
74 self.failUnlessEqual(len(i.get_subscribers()), 0)
75 i.remote_publish(ann1b)
76 self.failUnlessEqual(len(i.get_announcements()), 2)
77 self.failUnlessEqual(len(i.get_subscribers()), 0)
79 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
82 ServiceMixin.setUp(self)
83 self.central_tub = tub = Tub()
84 #tub.setOption("logLocalFailures", True)
85 #tub.setOption("logRemoteFailures", True)
86 tub.setServiceParent(self.parent)
87 l = tub.listenOn("tcp:0")
88 portnum = l.getPortnum()
89 tub.setLocation("localhost:%d" % portnum)
91 class SystemTest(SystemTestMixin, unittest.TestCase):
93 def test_system(self):
94 i = IntroducerService()
95 i.setServiceParent(self.parent)
96 self.introducer_furl = self.central_tub.registerReference(i)
97 return self.do_system_test()
99 def test_system_oldserver(self):
100 i = old.IntroducerService_V1()
101 i.setServiceParent(self.parent)
102 self.introducer_furl = self.central_tub.registerReference(i)
103 return self.do_system_test()
105 def do_system_test(self):
108 # we have 5 clients who publish themselves, and an extra one does
109 # which not. When the connections are fully established, all six nodes
110 # should have 5 connections each.
114 for i in range(NUMCLIENTS+1):
116 #tub.setOption("logLocalFailures", True)
117 #tub.setOption("logRemoteFailures", True)
118 tub.setServiceParent(self.parent)
119 l = tub.listenOn("tcp:0")
120 portnum = l.getPortnum()
121 tub.setLocation("localhost:%d" % portnum)
124 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
125 client_class = IntroducerClient
127 client_class = old.IntroducerClient_V1
128 c = client_class(tub, self.introducer_furl,
129 "nickname-%d" % i, "version", "oldest")
131 node_furl = tub.registerReference(n)
132 c.publish(node_furl, "storage", "ri_name")
133 # the last one does not publish anything
135 c.subscribe_to("storage")
137 c.setServiceParent(self.parent)
141 def _wait_for_all_connections():
143 if len(c.get_all_connections()) < NUMCLIENTS:
146 d = self.poll(_wait_for_all_connections)
149 log.msg("doing _check1")
151 self.failUnless(c.connected_to_introducer())
152 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
153 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
154 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
156 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
157 self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
159 d.addCallback(_check1)
161 origin_c = clients[0]
162 def _disconnect_somebody_else(res):
163 # now disconnect somebody's connection to someone else
164 current_counter = origin_c.counter
165 victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
166 log.msg(" disconnecting %s->%s" %
167 (tubs[origin_c].tubID,
168 idlib.shortnodeid_b2a(victim_nodeid)))
169 origin_c.debug_disconnect_from_peerid(victim_nodeid)
170 log.msg(" did disconnect")
172 # then wait until something changes, which ought to be them
175 return current_counter != origin_c.counter
176 return self.poll(_compare)
178 d.addCallback(_disconnect_somebody_else)
180 # and wait for them to reconnect
181 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
183 log.msg("doing _check2")
185 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
186 d.addCallback(_check2)
188 def _disconnect_yourself(res):
189 # now disconnect somebody's connection to themselves.
190 current_counter = origin_c.counter
191 victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
192 log.msg(" disconnecting %s->%s" %
193 (tubs[origin_c].tubID,
194 idlib.shortnodeid_b2a(victim_nodeid)))
195 origin_c.debug_disconnect_from_peerid(victim_nodeid)
196 log.msg(" did disconnect from self")
199 return current_counter != origin_c.counter
200 return self.poll(_compare)
201 d.addCallback(_disconnect_yourself)
203 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
205 log.msg("doing _check3")
207 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
209 d.addCallback(_check3)
210 def _shutdown_introducer(res):
211 # now shut down the introducer. We do this by shutting down the
212 # tub it's using. Nobody's connections (to each other) should go
213 # down. All clients should notice the loss, and no other errors
215 log.msg("shutting down the introducer")
216 return self.central_tub.disownServiceParent()
217 d.addCallback(_shutdown_introducer)
218 def _wait_for_introducer_loss():
220 if c.connected_to_introducer():
223 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
226 log.msg("doing _check4")
228 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
230 self.failIf(c.connected_to_introducer())
231 d.addCallback(_check4)