]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
introducer: only record one announcement per (tubid,service) tuple. Fixes #343.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32decode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
8
9 from foolscap import Tub, Referenceable
10 from foolscap.eventual import fireEventually, flushEventualQueue
11 from twisted.application import service
12 from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
13 from allmydata.util import testutil, idlib
14
15 class FakeNode(Referenceable):
16     pass
17
18 class LoggingMultiService(service.MultiService):
19     def log(self, msg, **kw):
20         log.msg(msg, **kw)
21
22 class TestIntroducerNode(testutil.SignalMixin, unittest.TestCase):
23     def test_loadable(self):
24         basedir = "introducer.IntroducerNode.test_loadable"
25         os.mkdir(basedir)
26         q = IntroducerNode(basedir)
27         d = fireEventually(None)
28         d.addCallback(lambda res: q.startService())
29         d.addCallback(lambda res: q.when_tub_ready())
30         d.addCallback(lambda res: q.stopService())
31         d.addCallback(flushEventualQueue)
32         return d
33
34 class TestIntroducer(unittest.TestCase, testutil.PollMixin):
35     def setUp(self):
36         self.parent = LoggingMultiService()
37         self.parent.startService()
38     def tearDown(self):
39         log.msg("TestIntroducer.tearDown")
40         d = defer.succeed(None)
41         d.addCallback(lambda res: self.parent.stopService())
42         d.addCallback(flushEventualQueue)
43         return d
44
45
46     def test_create(self):
47         ic = IntroducerClient(None, "introducer.furl", "my_nickname",
48                               "my_version", "oldest_version")
49
50     def test_listen(self):
51         i = IntroducerService()
52         i.setServiceParent(self.parent)
53
54     def test_duplicate(self):
55         i = IntroducerService()
56         self.failUnlessEqual(len(i.get_announcements()), 0)
57         self.failUnlessEqual(len(i.get_subscribers()), 0)
58         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
59         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
60         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
61         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
62         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
63         i.remote_publish(ann1)
64         self.failUnlessEqual(len(i.get_announcements()), 1)
65         self.failUnlessEqual(len(i.get_subscribers()), 0)
66         i.remote_publish(ann2)
67         self.failUnlessEqual(len(i.get_announcements()), 2)
68         self.failUnlessEqual(len(i.get_subscribers()), 0)
69         i.remote_publish(ann1b)
70         self.failUnlessEqual(len(i.get_announcements()), 2)
71         self.failUnlessEqual(len(i.get_subscribers()), 0)
72
73
74     def test_system(self):
75
76         self.central_tub = tub = Tub()
77         #tub.setOption("logLocalFailures", True)
78         #tub.setOption("logRemoteFailures", True)
79         tub.setServiceParent(self.parent)
80         l = tub.listenOn("tcp:0")
81         portnum = l.getPortnum()
82         tub.setLocation("localhost:%d" % portnum)
83
84         i = IntroducerService()
85         i.setServiceParent(self.parent)
86         introducer_furl = tub.registerReference(i)
87         NUMCLIENTS = 5
88         # we have 5 clients who publish themselves, and an extra one which
89         # does not. When the connections are fully established, all six nodes
90         # should have 5 connections each.
91
92         clients = []
93         tubs = {}
94         for i in range(NUMCLIENTS+1):
95             tub = Tub()
96             #tub.setOption("logLocalFailures", True)
97             #tub.setOption("logRemoteFailures", True)
98             tub.setServiceParent(self.parent)
99             l = tub.listenOn("tcp:0")
100             portnum = l.getPortnum()
101             tub.setLocation("localhost:%d" % portnum)
102
103             n = FakeNode()
104             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
105             c = IntroducerClient(tub, introducer_furl,
106                                  "nickname-%d" % i, "version", "oldest")
107             if i < NUMCLIENTS:
108                 node_furl = tub.registerReference(n)
109                 c.publish(node_furl, "storage", "ri_name")
110             # the last one does not publish anything
111
112             c.subscribe_to("storage")
113
114             c.setServiceParent(self.parent)
115             clients.append(c)
116             tubs[c] = tub
117
118         def _wait_for_all_connections():
119             for c in clients:
120                 if len(c.get_all_connections()) < NUMCLIENTS:
121                     return False
122             return True
123         d = self.poll(_wait_for_all_connections)
124
125         def _check1(res):
126             log.msg("doing _check1")
127             for c in clients:
128                 self.failUnless(c.connected_to_introducer())
129                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
130                 self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
131                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
132                                      NUMCLIENTS)
133         d.addCallback(_check1)
134
135         origin_c = clients[0]
136         def _disconnect_somebody_else(res):
137             # now disconnect somebody's connection to someone else
138             current_counter = origin_c.counter
139             victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
140             log.msg(" disconnecting %s->%s" %
141                     (tubs[origin_c].tubID,
142                      idlib.shortnodeid_b2a(victim_nodeid)))
143             origin_c.debug_disconnect_from_peerid(victim_nodeid)
144             log.msg(" did disconnect")
145
146             # then wait until something changes, which ought to be them
147             # noticing the loss
148             def _compare():
149                 return current_counter != origin_c.counter
150             return self.poll(_compare)
151
152         d.addCallback(_disconnect_somebody_else)
153
154         # and wait for them to reconnect
155         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
156         def _check2(res):
157             log.msg("doing _check2")
158             for c in clients:
159                 self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
160         d.addCallback(_check2)
161
162         def _disconnect_yourself(res):
163             # now disconnect somebody's connection to themselves.
164             current_counter = origin_c.counter
165             victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
166             log.msg(" disconnecting %s->%s" %
167                     (tubs[origin_c].tubID,
168                      idlib.shortnodeid_b2a(victim_nodeid)))
169             origin_c.debug_disconnect_from_peerid(victim_nodeid)
170             log.msg(" did disconnect from self")
171
172             def _compare():
173                 return current_counter != origin_c.counter
174             return self.poll(_compare)
175         d.addCallback(_disconnect_yourself)
176
177         d.addCallback(lambda res: self.poll(_wait_for_all_connections))
178         def _check3(res):
179             log.msg("doing _check3")
180             for c in clients:
181                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
182                                      NUMCLIENTS)
183         d.addCallback(_check3)
184         def _shutdown_introducer(res):
185             # now shut down the introducer. We do this by shutting down the
186             # tub it's using. Nobody's connections (to each other) should go
187             # down. All clients should notice the loss, and no other errors
188             # should occur.
189             log.msg("shutting down the introducer")
190             return self.central_tub.disownServiceParent()
191         d.addCallback(_shutdown_introducer)
192         def _wait_for_introducer_loss():
193             for c in clients:
194                 if c.connected_to_introducer():
195                     return False
196             return True
197         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
198
199         def _check4(res):
200             log.msg("doing _check4")
201             for c in clients:
202                 self.failUnlessEqual(len(c.get_all_connections_for("storage")),
203                                      NUMCLIENTS)
204                 self.failIf(c.connected_to_introducer())
205         d.addCallback(_check4)
206         return d
207