1 from base64 import b32decode
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
13 from allmydata.util import testutil, idlib
15 class FakeNode(Referenceable):
18 class LoggingMultiService(service.MultiService):
19 def log(self, msg, **kw):
22 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
23 def test_loadable(self):
24 basedir = "introducer.IntroducerNode.test_loadable"
26 q = IntroducerNode(basedir)
27 d = fireEventually(None)
28 d.addCallback(lambda res: q.startService())
29 d.addCallback(lambda res: q.when_tub_ready())
30 d.addCallback(lambda res: q.stopService())
31 d.addCallback(flushEventualQueue)
34 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
36 self.parent = LoggingMultiService()
37 self.parent.startService()
39 log.msg("TestIntroducer.tearDown")
40 d = defer.succeed(None)
41 d.addCallback(lambda res: self.parent.stopService())
42 d.addCallback(flushEventualQueue)
46 def test_create(self):
47 ic = IntroducerClient(None, "introducer.furl", "my_nickname",
48 "my_version", "oldest_version")
50 def test_listen(self):
51 i = IntroducerService()
52 i.setServiceParent(self.parent)
54 def test_duplicate(self):
55 i = IntroducerService()
56 self.failUnlessEqual(len(i.get_announcements()), 0)
57 self.failUnlessEqual(len(i.get_subscribers()), 0)
58 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
59 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
60 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
61 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
62 ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
63 i.remote_publish(ann1)
64 self.failUnlessEqual(len(i.get_announcements()), 1)
65 self.failUnlessEqual(len(i.get_subscribers()), 0)
66 i.remote_publish(ann2)
67 self.failUnlessEqual(len(i.get_announcements()), 2)
68 self.failUnlessEqual(len(i.get_subscribers()), 0)
69 i.remote_publish(ann1b)
70 self.failUnlessEqual(len(i.get_announcements()), 2)
71 self.failUnlessEqual(len(i.get_subscribers()), 0)
74 def test_system(self):
76 self.central_tub = tub = Tub()
77 #tub.setOption("logLocalFailures", True)
78 #tub.setOption("logRemoteFailures", True)
79 tub.setServiceParent(self.parent)
80 l = tub.listenOn("tcp:0")
81 portnum = l.getPortnum()
82 tub.setLocation("localhost:%d" % portnum)
84 i = IntroducerService()
85 i.setServiceParent(self.parent)
86 introducer_furl = tub.registerReference(i)
88 # we have 5 clients who publish themselves, and an extra one which
89 # does not. When the connections are fully established, all six nodes
90 # should have 5 connections each.
94 for i in range(NUMCLIENTS+1):
96 #tub.setOption("logLocalFailures", True)
97 #tub.setOption("logRemoteFailures", True)
98 tub.setServiceParent(self.parent)
99 l = tub.listenOn("tcp:0")
100 portnum = l.getPortnum()
101 tub.setLocation("localhost:%d" % portnum)
104 log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
105 c = IntroducerClient(tub, introducer_furl,
106 "nickname-%d" % i, "version", "oldest")
108 node_furl = tub.registerReference(n)
109 c.publish(node_furl, "storage", "ri_name")
110 # the last one does not publish anything
112 c.subscribe_to("storage")
114 c.setServiceParent(self.parent)
118 def _wait_for_all_connections():
120 if len(c.get_all_connections()) < NUMCLIENTS:
123 d = self.poll(_wait_for_all_connections)
126 log.msg("doing _check1")
128 self.failUnless(c.connected_to_introducer())
129 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
130 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
131 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
133 d.addCallback(_check1)
135 origin_c = clients[0]
136 def _disconnect_somebody_else(res):
137 # now disconnect somebody's connection to someone else
138 current_counter = origin_c.counter
139 victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
140 log.msg(" disconnecting %s->%s" %
141 (tubs[origin_c].tubID,
142 idlib.shortnodeid_b2a(victim_nodeid)))
143 origin_c.debug_disconnect_from_peerid(victim_nodeid)
144 log.msg(" did disconnect")
146 # then wait until something changes, which ought to be them
149 return current_counter != origin_c.counter
150 return self.poll(_compare)
152 d.addCallback(_disconnect_somebody_else)
154 # and wait for them to reconnect
155 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
157 log.msg("doing _check2")
159 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
160 d.addCallback(_check2)
162 def _disconnect_yourself(res):
163 # now disconnect somebody's connection to themselves.
164 current_counter = origin_c.counter
165 victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
166 log.msg(" disconnecting %s->%s" %
167 (tubs[origin_c].tubID,
168 idlib.shortnodeid_b2a(victim_nodeid)))
169 origin_c.debug_disconnect_from_peerid(victim_nodeid)
170 log.msg(" did disconnect from self")
173 return current_counter != origin_c.counter
174 return self.poll(_compare)
175 d.addCallback(_disconnect_yourself)
177 d.addCallback(lambda res: self.poll(_wait_for_all_connections))
179 log.msg("doing _check3")
181 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
183 d.addCallback(_check3)
184 def _shutdown_introducer(res):
185 # now shut down the introducer. We do this by shutting down the
186 # tub it's using. Nobody's connections (to each other) should go
187 # down. All clients should notice the loss, and no other errors
189 log.msg("shutting down the introducer")
190 return self.central_tub.disownServiceParent()
191 d.addCallback(_shutdown_introducer)
192 def _wait_for_introducer_loss():
194 if c.connected_to_introducer():
197 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
200 log.msg("doing _check4")
202 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
204 self.failIf(c.connected_to_introducer())
205 d.addCallback(_check4)