]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
break introducer up into separate modules in the new allmydata.introducer package
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32decode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
8
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 # test compatibility with old introducer .tac files
15 from allmydata.introducer import IntroducerNode
16 from allmydata.util import testutil, idlib
17
18 class FakeNode(Referenceable):
19     pass
20
21 class LoggingMultiService(service.MultiService):
22     def log(self, msg, **kw):
23         log.msg(msg, **kw)
24
25 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
26     def test_loadable(self):
27         basedir = "introducer.IntroducerNode.test_loadable"
28         os.mkdir(basedir)
29         q = IntroducerNode(basedir)
30         d = fireEventually(None)
31         d.addCallback(lambda res: q.startService())
32         d.addCallback(lambda res: q.when_tub_ready())
33         d.addCallback(lambda res: q.stopService())
34         d.addCallback(flushEventualQueue)
35         return d
36
37 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
38     def setUp(self):
39         self.parent = LoggingMultiService()
40         self.parent.startService()
41     def tearDown(self):
42         log.msg("TestIntroducer.tearDown")
43         d = defer.succeed(None)
44         d.addCallback(lambda res: self.parent.stopService())
45         d.addCallback(flushEventualQueue)
46         return d
47
48
49     def test_create(self):
50         ic = IntroducerClient(None, "introducer.furl", "my_nickname",
51                               "my_version", "oldest_version")
52
53     def test_listen(self):
54         i = IntroducerService()
55         i.setServiceParent(self.parent)
56
57     def test_duplicate(self):
58         i = IntroducerService()
59         self.failUnlessEqual(len(i.get_announcements()), 0)
60         self.failUnlessEqual(len(i.get_subscribers()), 0)
61         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
62         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
63         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
64         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
65         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
66         i.remote_publish(ann1)
67         self.failUnlessEqual(len(i.get_announcements()), 1)
68         self.failUnlessEqual(len(i.get_subscribers()), 0)
69         i.remote_publish(ann2)
70         self.failUnlessEqual(len(i.get_announcements()), 2)
71         self.failUnlessEqual(len(i.get_subscribers()), 0)
72         i.remote_publish(ann1b)
73         self.failUnlessEqual(len(i.get_announcements()), 2)
74         self.failUnlessEqual(len(i.get_subscribers()), 0)
75
76
77     def test_system(self):
78
79         self.central_tub = tub = Tub()
80         #tub.setOption("logLocalFailures", True)
81         #tub.setOption("logRemoteFailures", True)
82         tub.setServiceParent(self.parent)
83         l = tub.listenOn("tcp:0")
84         portnum = l.getPortnum()
85         tub.setLocation("localhost:%d" % portnum)
86
87         i = IntroducerService()
88         i.setServiceParent(self.parent)
89         introducer_furl = tub.registerReference(i)
90         NUMCLIENTS = 5
91         # we have 5 clients who publish themselves, and an extra one which
92         # does not. When the connections are fully established, all six nodes
93         # should have 5 connections each.
94
95         clients = []
96         tubs = {}
97         for i in range(NUMCLIENTS+1):
98             tub = Tub()
99             #tub.setOption("logLocalFailures", True)
100             #tub.setOption("logRemoteFailures", True)
101             tub.setServiceParent(self.parent)
102             l = tub.listenOn("tcp:0")
103             portnum = l.getPortnum()
104             tub.setLocation("localhost:%d" % portnum)
105
106             n = FakeNode()
107             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
108             c = IntroducerClient(tub, introducer_furl,
109                                  "nickname-%d" % i, "version", "oldest")
110             if i < NUMCLIENTS:
111                 node_furl = tub.registerReference(n)
112                 c.publish(node_furl, "storage", "ri_name")
113             # the last one does not publish anything
114
115             c.subscribe_to("storage")
116
117             c.setServiceParent(self.parent)
118             clients.append(c)
119             tubs[c] = tub
120
121         def _wait_for_all_connections():
122             for c in clients:
123                 if len(c.get_all_connections()) < NUMCLIENTS:
124                     return False
125             return True
126         d = self.poll(_wait_for_all_connections)
127
128         def _check1(res):
129             log.msg("doing _check1")
130             for c in clients:
131                 self.failUnless(c.connected_to_introducer())
132                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
133                 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
134                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
135                                      NUMCLIENTS)
136         d.addCallback(_check1)
137
138         origin_c = clients[0]
139         def _disconnect_somebody_else(res):
140             # now disconnect somebody's connection to someone else
141             current_counter = origin_c.counter
142             victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
143             log.msg(" disconnecting %s->%s" %
144                     (tubs[origin_c].tubID,
145                      idlib.shortnodeid_b2a(victim_nodeid)))
146             origin_c.debug_disconnect_from_peerid(victim_nodeid)
147             log.msg(" did disconnect")
148
149             # then wait until something changes, which ought to be them
150             # noticing the loss
151             def _compare():
152                 return current_counter != origin_c.counter
153             return self.poll(_compare)
154
155         d.addCallback(_disconnect_somebody_else)
156
157         # and wait for them to reconnect
158         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
159         def _check2(res):
160             log.msg("doing _check2")
161             for c in clients:
162                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
163         d.addCallback(_check2)
164
165         def _disconnect_yourself(res):
166             # now disconnect somebody's connection to themselves.
167             current_counter = origin_c.counter
168             victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
169             log.msg(" disconnecting %s->%s" %
170                     (tubs[origin_c].tubID,
171                      idlib.shortnodeid_b2a(victim_nodeid)))
172             origin_c.debug_disconnect_from_peerid(victim_nodeid)
173             log.msg(" did disconnect from self")
174
175             def _compare():
176                 return current_counter != origin_c.counter
177             return self.poll(_compare)
178         d.addCallback(_disconnect_yourself)
179
180         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
181         def _check3(res):
182             log.msg("doing _check3")
183             for c in clients:
184                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
185                                      NUMCLIENTS)
186         d.addCallback(_check3)
187         def _shutdown_introducer(res):
188             # now shut down the introducer. We do this by shutting down the
189             # tub it's using. Nobody's connections (to each other) should go
190             # down. All clients should notice the loss, and no other errors
191             # should occur.
192             log.msg("shutting down the introducer")
193             return self.central_tub.disownServiceParent()
194         d.addCallback(_shutdown_introducer)
195         def _wait_for_introducer_loss():
196             for c in clients:
197                 if c.connected_to_introducer():
198                     return False
199             return True
200         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
201
202         def _check4(res):
203             log.msg("doing _check4")
204             for c in clients:
205                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
206                                      NUMCLIENTS)
207                 self.failIf(c.connected_to_introducer())
208         d.addCallback(_check4)
209         return d
210