1 from base64 import b32decode
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
13 from allmydata.util import testutil, idlib
15 class FakeNode(Referenceable):
18 class LoggingMultiService(service.MultiService):
19 def log(self, msg, **kw):
22 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
23 def test_loadable(self):
24 basedir = "introducer.IntroducerNode.test_loadable"
26 q = IntroducerNode(basedir)
27 d = fireEventually(None)
28 d.addCallback(lambda res: q.startService())
29 d.addCallback(lambda res: q.when_tub_ready())
30 d.addCallback(lambda res: q.stopService())
31 d.addCallback(flushEventualQueue)
34 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
36 self.parent = LoggingMultiService()
37 self.parent.startService()
39 log.msg("TestIntroducer.tearDown")
40 d = defer.succeed(None)
41 d.addCallback(lambda res: self.parent.stopService())
42 d.addCallback(flushEventualQueue)
46 def test_create(self):
47 ic = IntroducerClient(None, "introducer.furl", "my_nickname",
48 "my_version", "oldest_version")
50 def test_listen(self):
51 i = IntroducerService()
52 i.setServiceParent(self.parent)
54 def test_system(self):
56 self.central_tub = tub = Tub()
57 #tub.setOption("logLocalFailures", True)
58 #tub.setOption("logRemoteFailures", True)
59 tub.setServiceParent(self.parent)
60 l = tub.listenOn("tcp:0")
61 portnum = l.getPortnum()
62 tub.setLocation("localhost:%d" % portnum)
64 i = IntroducerService()
65 i.setServiceParent(self.parent)
66 introducer_furl = tub.registerReference(i)
68 # we have 5 clients who publish themselves, and an extra one which
69 # does not. When the connections are fully established, all six nodes
70 # should have 5 connections each.
74 for i in range(NUMCLIENTS+1):
76 #tub.setOption("logLocalFailures", True)
77 #tub.setOption("logRemoteFailures", True)
78 tub.setServiceParent(self.parent)
79 l = tub.listenOn("tcp:0")
80 portnum = l.getPortnum()
81 tub.setLocation("localhost:%d" % portnum)
84 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
85 c = IntroducerClient(tub, introducer_furl,
86 "nickname-%d" % i, "version", "oldest")
88 node_furl = tub.registerReference(n)
89 c.publish(node_furl, "storage", "ri_name")
90 # the last one does not publish anything
92 c.subscribe_to("storage")
94 c.setServiceParent(self.parent)
98 def _wait_for_all_connections():
100 if len(c.get_all_connections()) < NUMCLIENTS:
103 d = self.poll(_wait_for_all_connections)
106 log.msg("doing _check1")
108 self.failUnless(c.connected_to_introducer())
109 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
110 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
111 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
113 d.addCallback(_check1)
115 origin_c = clients[0]
116 def _disconnect_somebody_else(res):
117 # now disconnect somebody's connection to someone else
118 current_counter = origin_c.counter
119 victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
120 log.msg(" disconnecting %s->%s" %
121 (tubs[origin_c].tubID,
122 idlib.shortnodeid_b2a(victim_nodeid)))
123 origin_c.debug_disconnect_from_peerid(victim_nodeid)
124 log.msg(" did disconnect")
126 # then wait until something changes, which ought to be them
129 return current_counter != origin_c.counter
130 return self.poll(_compare)
132 d.addCallback(_disconnect_somebody_else)
134 # and wait for them to reconnect
135 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
137 log.msg("doing _check2")
139 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
140 d.addCallback(_check2)
142 def _disconnect_yourself(res):
143 # now disconnect somebody's connection to themselves.
144 current_counter = origin_c.counter
145 victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
146 log.msg(" disconnecting %s->%s" %
147 (tubs[origin_c].tubID,
148 idlib.shortnodeid_b2a(victim_nodeid)))
149 origin_c.debug_disconnect_from_peerid(victim_nodeid)
150 log.msg(" did disconnect from self")
153 return current_counter != origin_c.counter
154 return self.poll(_compare)
155 d.addCallback(_disconnect_yourself)
157 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
159 log.msg("doing _check3")
161 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
163 d.addCallback(_check3)
164 def _shutdown_introducer(res):
165 # now shut down the introducer. We do this by shutting down the
166 # tub it's using. Nobody's connections (to each other) should go
167 # down. All clients should notice the loss, and no other errors
169 log.msg("shutting down the introducer")
170 return self.central_tub.disownServiceParent()
171 d.addCallback(_shutdown_introducer)
172 def _wait_for_introducer_loss():
174 if c.connected_to_introducer():
177 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
180 log.msg("doing _check4")
182 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
184 self.failIf(c.connected_to_introducer())
185 d.addCallback(_check4)