1 from base64 import b32decode
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 # test compatibility with old introducer .tac files
15 from allmydata.introducer import IntroducerNode
16 from allmydata.util import testutil, idlib
18 class FakeNode(Referenceable):
21 class LoggingMultiService(service.MultiService):
22 def log(self, msg, **kw):
25 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
26 def test_loadable(self):
27 basedir = "introducer.IntroducerNode.test_loadable"
29 q = IntroducerNode(basedir)
30 d = fireEventually(None)
31 d.addCallback(lambda res: q.startService())
32 d.addCallback(lambda res: q.when_tub_ready())
33 d.addCallback(lambda res: q.stopService())
34 d.addCallback(flushEventualQueue)
37 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
39 self.parent = LoggingMultiService()
40 self.parent.startService()
42 log.msg("TestIntroducer.tearDown")
43 d = defer.succeed(None)
44 d.addCallback(lambda res: self.parent.stopService())
45 d.addCallback(flushEventualQueue)
49 def test_create(self):
50 ic = IntroducerClient(None, "introducer.furl", "my_nickname",
51 "my_version", "oldest_version")
53 def test_listen(self):
54 i = IntroducerService()
55 i.setServiceParent(self.parent)
57 def test_duplicate(self):
58 i = IntroducerService()
59 self.failUnlessEqual(len(i.get_announcements()), 0)
60 self.failUnlessEqual(len(i.get_subscribers()), 0)
61 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
62 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
63 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
64 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
65 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
66 i.remote_publish(ann1)
67 self.failUnlessEqual(len(i.get_announcements()), 1)
68 self.failUnlessEqual(len(i.get_subscribers()), 0)
69 i.remote_publish(ann2)
70 self.failUnlessEqual(len(i.get_announcements()), 2)
71 self.failUnlessEqual(len(i.get_subscribers()), 0)
72 i.remote_publish(ann1b)
73 self.failUnlessEqual(len(i.get_announcements()), 2)
74 self.failUnlessEqual(len(i.get_subscribers()), 0)
77 def test_system(self):
79 self.central_tub = tub = Tub()
80 #tub.setOption("logLocalFailures", True)
81 #tub.setOption("logRemoteFailures", True)
82 tub.setServiceParent(self.parent)
83 l = tub.listenOn("tcp:0")
84 portnum = l.getPortnum()
85 tub.setLocation("localhost:%d" % portnum)
87 i = IntroducerService()
88 i.setServiceParent(self.parent)
89 introducer_furl = tub.registerReference(i)
91 # we have 5 clients who publish themselves, and an extra one which
92 # does not. When the connections are fully established, all six nodes
93 # should have 5 connections each.
97 for i in range(NUMCLIENTS+1):
99 #tub.setOption("logLocalFailures", True)
100 #tub.setOption("logRemoteFailures", True)
101 tub.setServiceParent(self.parent)
102 l = tub.listenOn("tcp:0")
103 portnum = l.getPortnum()
104 tub.setLocation("localhost:%d" % portnum)
107 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
108 c = IntroducerClient(tub, introducer_furl,
109 "nickname-%d" % i, "version", "oldest")
111 node_furl = tub.registerReference(n)
112 c.publish(node_furl, "storage", "ri_name")
113 # the last one does not publish anything
115 c.subscribe_to("storage")
117 c.setServiceParent(self.parent)
121 def _wait_for_all_connections():
123 if len(c.get_all_connections()) < NUMCLIENTS:
126 d = self.poll(_wait_for_all_connections)
129 log.msg("doing _check1")
131 self.failUnless(c.connected_to_introducer())
132 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
133 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
134 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
136 d.addCallback(_check1)
138 origin_c = clients[0]
139 def _disconnect_somebody_else(res):
140 # now disconnect somebody's connection to someone else
141 current_counter = origin_c.counter
142 victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
143 log.msg(" disconnecting %s->%s" %
144 (tubs[origin_c].tubID,
145 idlib.shortnodeid_b2a(victim_nodeid)))
146 origin_c.debug_disconnect_from_peerid(victim_nodeid)
147 log.msg(" did disconnect")
149 # then wait until something changes, which ought to be them
152 return current_counter != origin_c.counter
153 return self.poll(_compare)
155 d.addCallback(_disconnect_somebody_else)
157 # and wait for them to reconnect
158 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
160 log.msg("doing _check2")
162 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
163 d.addCallback(_check2)
165 def _disconnect_yourself(res):
166 # now disconnect somebody's connection to themselves.
167 current_counter = origin_c.counter
168 victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
169 log.msg(" disconnecting %s->%s" %
170 (tubs[origin_c].tubID,
171 idlib.shortnodeid_b2a(victim_nodeid)))
172 origin_c.debug_disconnect_from_peerid(victim_nodeid)
173 log.msg(" did disconnect from self")
176 return current_counter != origin_c.counter
177 return self.poll(_compare)
178 d.addCallback(_disconnect_yourself)
180 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
182 log.msg("doing _check3")
184 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
186 d.addCallback(_check3)
187 def _shutdown_introducer(res):
188 # now shut down the introducer. We do this by shutting down the
189 # tub it's using. Nobody's connections (to each other) should go
190 # down. All clients should notice the loss, and no other errors
192 log.msg("shutting down the introducer")
193 return self.central_tub.disownServiceParent()
194 d.addCallback(_shutdown_introducer)
195 def _wait_for_introducer_loss():
197 if c.connected_to_introducer():
200 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
203 log.msg("doing _check4")
205 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
207 self.failIf(c.connected_to_introducer())
208 d.addCallback(_check4)