]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
test_introducer.py: don't log nodeids as binary goop
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32decode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
8
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
13 from allmydata.util import testutil, idlib
14
15 class FakeNode(Referenceable):
16     pass
17
18 class LoggingMultiService(service.MultiService):
19     def log(self, msg, **kw):
20         log.msg(msg, **kw)
21
22 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
23     def test_loadable(self):
24         basedir = "introducer.IntroducerNode.test_loadable"
25         os.mkdir(basedir)
26         q = IntroducerNode(basedir)
27         d = fireEventually(None)
28         d.addCallback(lambda res: q.startService())
29         d.addCallback(lambda res: q.when_tub_ready())
30         d.addCallback(lambda res: q.stopService())
31         d.addCallback(flushEventualQueue)
32         return d
33
34 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
35     def setUp(self):
36         self.parent = LoggingMultiService()
37         self.parent.startService()
38     def tearDown(self):
39         log.msg("TestIntroducer.tearDown")
40         d = defer.succeed(None)
41         d.addCallback(lambda res: self.parent.stopService())
42         d.addCallback(flushEventualQueue)
43         return d
44
45
46     def test_create(self):
47         ic = IntroducerClient(None, "introducer.furl", "my_nickname",
48                               "my_version", "oldest_version")
49
50     def test_listen(self):
51         i = IntroducerService()
52         i.setServiceParent(self.parent)
53
54     def test_system(self):
55
56         self.central_tub = tub = Tub()
57         #tub.setOption("logLocalFailures", True)
58         #tub.setOption("logRemoteFailures", True)
59         tub.setServiceParent(self.parent)
60         l = tub.listenOn("tcp:0")
61         portnum = l.getPortnum()
62         tub.setLocation("localhost:%d" % portnum)
63
64         i = IntroducerService()
65         i.setServiceParent(self.parent)
66         introducer_furl = tub.registerReference(i)
67         NUMCLIENTS = 5
68         # we have 5 clients who publish themselves, and an extra one which
69         # does not. When the connections are fully established, all six nodes
70         # should have 5 connections each.
71
72         clients = []
73         tubs = {}
74         for i in range(NUMCLIENTS+1):
75             tub = Tub()
76             #tub.setOption("logLocalFailures", True)
77             #tub.setOption("logRemoteFailures", True)
78             tub.setServiceParent(self.parent)
79             l = tub.listenOn("tcp:0")
80             portnum = l.getPortnum()
81             tub.setLocation("localhost:%d" % portnum)
82
83             n = FakeNode()
84             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
85             c = IntroducerClient(tub, introducer_furl,
86                                  "nickname-%d" % i, "version", "oldest")
87             if i < NUMCLIENTS:
88                 node_furl = tub.registerReference(n)
89                 c.publish(node_furl, "storage", "ri_name")
90             # the last one does not publish anything
91
92             c.subscribe_to("storage")
93
94             c.setServiceParent(self.parent)
95             clients.append(c)
96             tubs[c] = tub
97
98         def _wait_for_all_connections():
99             for c in clients:
100                 if len(c.get_all_connections()) < NUMCLIENTS:
101                     return False
102             return True
103         d = self.poll(_wait_for_all_connections)
104
105         def _check1(res):
106             log.msg("doing _check1")
107             for c in clients:
108                 self.failUnless(c.connected_to_introducer())
109                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
110                 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
111                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
112                                      NUMCLIENTS)
113         d.addCallback(_check1)
114
115         origin_c = clients[0]
116         def _disconnect_somebody_else(res):
117             # now disconnect somebody's connection to someone else
118             current_counter = origin_c.counter
119             victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
120             log.msg(" disconnecting %s->%s" %
121                     (tubs[origin_c].tubID,
122                      idlib.shortnodeid_b2a(victim_nodeid)))
123             origin_c.debug_disconnect_from_peerid(victim_nodeid)
124             log.msg(" did disconnect")
125
126             # then wait until something changes, which ought to be them
127             # noticing the loss
128             def _compare():
129                 return current_counter != origin_c.counter
130             return self.poll(_compare)
131
132         d.addCallback(_disconnect_somebody_else)
133
134         # and wait for them to reconnect
135         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
136         def _check2(res):
137             log.msg("doing _check2")
138             for c in clients:
139                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
140         d.addCallback(_check2)
141
142         def _disconnect_yourself(res):
143             # now disconnect somebody's connection to themselves.
144             current_counter = origin_c.counter
145             victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
146             log.msg(" disconnecting %s->%s" %
147                     (tubs[origin_c].tubID,
148                      idlib.shortnodeid_b2a(victim_nodeid)))
149             origin_c.debug_disconnect_from_peerid(victim_nodeid)
150             log.msg(" did disconnect from self")
151
152             def _compare():
153                 return current_counter != origin_c.counter
154             return self.poll(_compare)
155         d.addCallback(_disconnect_yourself)
156
157         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
158         def _check3(res):
159             log.msg("doing _check3")
160             for c in clients:
161                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
162                                      NUMCLIENTS)
163         d.addCallback(_check3)
164         def _shutdown_introducer(res):
165             # now shut down the introducer. We do this by shutting down the
166             # tub it's using. Nobody's connections (to each other) should go
167             # down. All clients should notice the loss, and no other errors
168             # should occur.
169             log.msg("shutting down the introducer")
170             return self.central_tub.disownServiceParent()
171         d.addCallback(_shutdown_introducer)
172         def _wait_for_introducer_loss():
173             for c in clients:
174                 if c.connected_to_introducer():
175                     return False
176             return True
177         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
178
179         def _check4(res):
180             log.msg("doing _check4")
181             for c in clients:
182                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
183                                      NUMCLIENTS)
184                 self.failIf(c.connected_to_introducer())
185         d.addCallback(_check4)
186         return d
187