1 from base64 import b32decode
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 # test compatibility with old introducer .tac files
15 from allmydata.introducer import IntroducerNode
16 from allmydata.introducer import old
17 from allmydata.util import testutil, idlib, pollmixin
19 class FakeNode(Referenceable):
22 class LoggingMultiService(service.MultiService):
23 def log(self, msg, **kw):
26 class Node(testutil.SignalMixin, unittest.TestCase):
27 def test_loadable(self):
28 basedir = "introducer.IntroducerNode.test_loadable"
30 q = IntroducerNode(basedir)
31 d = fireEventually(None)
32 d.addCallback(lambda res: q.startService())
33 d.addCallback(lambda res: q.when_tub_ready())
34 d.addCallback(lambda res: q.stopService())
35 d.addCallback(flushEventualQueue)
40 self.parent = LoggingMultiService()
41 self.parent.startService()
43 log.msg("TestIntroducer.tearDown")
44 d = defer.succeed(None)
45 d.addCallback(lambda res: self.parent.stopService())
46 d.addCallback(flushEventualQueue)
49 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
51 def test_create(self):
52 ic = IntroducerClient(None, "introducer.furl", "my_nickname",
53 "my_version", "oldest_version")
55 def test_listen(self):
56 i = IntroducerService()
57 i.setServiceParent(self.parent)
59 def test_duplicate(self):
60 i = IntroducerService()
61 self.failUnlessEqual(len(i.get_announcements()), 0)
62 self.failUnlessEqual(len(i.get_subscribers()), 0)
63 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
64 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
65 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
66 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
67 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
68 i.remote_publish(ann1)
69 self.failUnlessEqual(len(i.get_announcements()), 1)
70 self.failUnlessEqual(len(i.get_subscribers()), 0)
71 i.remote_publish(ann2)
72 self.failUnlessEqual(len(i.get_announcements()), 2)
73 self.failUnlessEqual(len(i.get_subscribers()), 0)
74 i.remote_publish(ann1b)
75 self.failUnlessEqual(len(i.get_announcements()), 2)
76 self.failUnlessEqual(len(i.get_subscribers()), 0)
78 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
81 ServiceMixin.setUp(self)
82 self.central_tub = tub = Tub()
83 #tub.setOption("logLocalFailures", True)
84 #tub.setOption("logRemoteFailures", True)
85 tub.setServiceParent(self.parent)
86 l = tub.listenOn("tcp:0")
87 portnum = l.getPortnum()
88 tub.setLocation("localhost:%d" % portnum)
90 class SystemTest(SystemTestMixin, unittest.TestCase):
92 def test_system(self):
93 i = IntroducerService()
94 i.setServiceParent(self.parent)
95 self.introducer_furl = self.central_tub.registerReference(i)
96 return self.do_system_test()
98 def test_system_oldserver(self):
99 i = old.IntroducerService_V1()
100 i.setServiceParent(self.parent)
101 self.introducer_furl = self.central_tub.registerReference(i)
102 return self.do_system_test()
104 def do_system_test(self):
107 # we have 5 clients who publish themselves, and an extra one does
108 # which not. When the connections are fully established, all six nodes
109 # should have 5 connections each.
113 for i in range(NUMCLIENTS+1):
115 #tub.setOption("logLocalFailures", True)
116 #tub.setOption("logRemoteFailures", True)
117 tub.setServiceParent(self.parent)
118 l = tub.listenOn("tcp:0")
119 portnum = l.getPortnum()
120 tub.setLocation("localhost:%d" % portnum)
123 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
124 client_class = IntroducerClient
126 client_class = old.IntroducerClient_V1
127 c = client_class(tub, self.introducer_furl,
128 "nickname-%d" % i, "version", "oldest")
130 node_furl = tub.registerReference(n)
131 c.publish(node_furl, "storage", "ri_name")
132 # the last one does not publish anything
134 c.subscribe_to("storage")
136 c.setServiceParent(self.parent)
140 def _wait_for_all_connections():
142 if len(c.get_all_connections()) < NUMCLIENTS:
145 d = self.poll(_wait_for_all_connections)
148 log.msg("doing _check1")
150 self.failUnless(c.connected_to_introducer())
151 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
152 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
153 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
155 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
156 self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
158 d.addCallback(_check1)
160 origin_c = clients[0]
161 def _disconnect_somebody_else(res):
162 # now disconnect somebody's connection to someone else
163 current_counter = origin_c.counter
164 victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
165 log.msg(" disconnecting %s->%s" %
166 (tubs[origin_c].tubID,
167 idlib.shortnodeid_b2a(victim_nodeid)))
168 origin_c.debug_disconnect_from_peerid(victim_nodeid)
169 log.msg(" did disconnect")
171 # then wait until something changes, which ought to be them
174 return current_counter != origin_c.counter
175 return self.poll(_compare)
177 d.addCallback(_disconnect_somebody_else)
179 # and wait for them to reconnect
180 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
182 log.msg("doing _check2")
184 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
185 d.addCallback(_check2)
187 def _disconnect_yourself(res):
188 # now disconnect somebody's connection to themselves.
189 current_counter = origin_c.counter
190 victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
191 log.msg(" disconnecting %s->%s" %
192 (tubs[origin_c].tubID,
193 idlib.shortnodeid_b2a(victim_nodeid)))
194 origin_c.debug_disconnect_from_peerid(victim_nodeid)
195 log.msg(" did disconnect from self")
198 return current_counter != origin_c.counter
199 return self.poll(_compare)
200 d.addCallback(_disconnect_yourself)
202 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
204 log.msg("doing _check3")
206 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
208 d.addCallback(_check3)
209 def _shutdown_introducer(res):
210 # now shut down the introducer. We do this by shutting down the
211 # tub it's using. Nobody's connections (to each other) should go
212 # down. All clients should notice the loss, and no other errors
214 log.msg("shutting down the introducer")
215 return self.central_tub.disownServiceParent()
216 d.addCallback(_shutdown_introducer)
217 def _wait_for_introducer_loss():
219 if c.connected_to_introducer():
222 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
225 log.msg("doing _check4")
227 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
229 self.failIf(c.connected_to_introducer())
230 d.addCallback(_check4)