]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
big rework of introducer client: change local API, split division of responsibilites...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1 from base64 import b32decode
2
3 import os
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
8
9 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
10 from twisted.application import service
11 from allmydata.interfaces import InsufficientVersionError
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 # test compatibility with old introducer .tac files
15 from allmydata.introducer import IntroducerNode
16 from allmydata.introducer import old
17 from allmydata.util import pollmixin
18 import common_util as testutil
19
20 class LoggingMultiService(service.MultiService):
21     def log(self, msg, **kw):
22         log.msg(msg, **kw)
23
24 class Node(testutil.SignalMixin, unittest.TestCase):
25     def test_loadable(self):
26         basedir = "introducer.IntroducerNode.test_loadable"
27         os.mkdir(basedir)
28         q = IntroducerNode(basedir)
29         d = fireEventually(None)
30         d.addCallback(lambda res: q.startService())
31         d.addCallback(lambda res: q.when_tub_ready())
32         d.addCallback(lambda res: q.stopService())
33         d.addCallback(flushEventualQueue)
34         return d
35
36 class ServiceMixin:
37     def setUp(self):
38         self.parent = LoggingMultiService()
39         self.parent.startService()
40     def tearDown(self):
41         log.msg("TestIntroducer.tearDown")
42         d = defer.succeed(None)
43         d.addCallback(lambda res: self.parent.stopService())
44         d.addCallback(flushEventualQueue)
45         return d
46
47 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
48
49     def test_create(self):
50         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
51                               "my_version", "oldest_version")
52
53     def test_listen(self):
54         i = IntroducerService()
55         i.setServiceParent(self.parent)
56
57     def test_duplicate(self):
58         i = IntroducerService()
59         self.failUnlessEqual(len(i.get_announcements()), 0)
60         self.failUnlessEqual(len(i.get_subscribers()), 0)
61         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
62         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
63         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
64         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
65         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
66         i.remote_publish(ann1)
67         self.failUnlessEqual(len(i.get_announcements()), 1)
68         self.failUnlessEqual(len(i.get_subscribers()), 0)
69         i.remote_publish(ann2)
70         self.failUnlessEqual(len(i.get_announcements()), 2)
71         self.failUnlessEqual(len(i.get_subscribers()), 0)
72         i.remote_publish(ann1b)
73         self.failUnlessEqual(len(i.get_announcements()), 2)
74         self.failUnlessEqual(len(i.get_subscribers()), 0)
75
76 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
77
78     def create_tub(self, portnum=0):
79         tubfile = os.path.join(self.basedir, "tub.pem")
80         self.central_tub = tub = Tub(certFile=tubfile)
81         #tub.setOption("logLocalFailures", True)
82         #tub.setOption("logRemoteFailures", True)
83         tub.setOption("expose-remote-exception-types", False)
84         tub.setServiceParent(self.parent)
85         l = tub.listenOn("tcp:%d" % portnum)
86         self.central_portnum = l.getPortnum()
87         if portnum != 0:
88             assert self.central_portnum == portnum
89         tub.setLocation("localhost:%d" % self.central_portnum)
90
91 class SystemTest(SystemTestMixin, unittest.TestCase):
92
93     def test_system(self):
94         self.basedir = "introducer/SystemTest/system"
95         os.makedirs(self.basedir)
96         return self.do_system_test(IntroducerService)
97     test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
98
99     def do_system_test(self, create_introducer):
100         self.create_tub()
101         introducer = create_introducer()
102         introducer.setServiceParent(self.parent)
103         iff = os.path.join(self.basedir, "introducer.furl")
104         tub = self.central_tub
105         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
106         self.introducer_furl = ifurl
107
108         NUMCLIENTS = 5
109         # we have 5 clients who publish themselves, and an extra one does
110         # which not. When the connections are fully established, all six nodes
111         # should have 5 connections each.
112
113         clients = []
114         tubs = {}
115         received_announcements = {}
116         NUM_SERVERS = NUMCLIENTS
117         subscribing_clients = []
118         publishing_clients = []
119
120         for i in range(NUMCLIENTS+1):
121             tub = Tub()
122             #tub.setOption("logLocalFailures", True)
123             #tub.setOption("logRemoteFailures", True)
124             tub.setOption("expose-remote-exception-types", False)
125             tub.setServiceParent(self.parent)
126             l = tub.listenOn("tcp:0")
127             portnum = l.getPortnum()
128             tub.setLocation("localhost:%d" % portnum)
129
130             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
131             c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i,
132                                  "version", "oldest")
133             received_announcements[c] = ra = {}
134             def got(serverid, ann_d, announcements):
135                 announcements[serverid] = ann_d
136             c.subscribe_to("storage", got, received_announcements[c])
137             subscribing_clients.append(c)
138
139             if i < NUMCLIENTS:
140                 node_furl = tub.registerReference(Referenceable())
141                 c.publish(node_furl, "storage", "ri_name")
142                 publishing_clients.append(c)
143             # the last one does not publish anything
144
145             c.setServiceParent(self.parent)
146             clients.append(c)
147             tubs[c] = tub
148
149         def _wait_for_all_connections():
150             for c in subscribing_clients:
151                 if len(received_announcements[c]) < NUM_SERVERS:
152                     return False
153             return True
154         d = self.poll(_wait_for_all_connections)
155
156         def _check1(res):
157             log.msg("doing _check1")
158             dc = introducer._debug_counts
159             self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS)
160             self.failUnlessEqual(dc["inbound_duplicate"], 0)
161             self.failUnlessEqual(dc["inbound_update"], 0)
162             self.failUnless(dc["outbound_message"])
163
164             for c in clients:
165                 self.failUnless(c.connected_to_introducer())
166             for c in subscribing_clients:
167                 cdc = c._debug_counts
168                 self.failUnless(cdc["inbound_message"])
169                 self.failUnlessEqual(cdc["inbound_announcement"],
170                                      NUM_SERVERS)
171                 self.failUnlessEqual(cdc["wrong_service"], 0)
172                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
173                 self.failUnlessEqual(cdc["update"], 0)
174                 self.failUnlessEqual(cdc["new_announcement"],
175                                      NUM_SERVERS)
176                 anns = received_announcements[c]
177                 self.failUnlessEqual(len(anns), NUM_SERVERS)
178
179                 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
180                 ann_d = anns[nodeid0]
181                 nick = ann_d["nickname"]
182                 self.failUnlessEqual(type(nick), unicode)
183                 self.failUnlessEqual(nick, u"nickname-0")
184             for c in publishing_clients:
185                 cdc = c._debug_counts
186                 self.failUnlessEqual(cdc["outbound_message"], 1)
187         d.addCallback(_check1)
188
189         # force an introducer reconnect, by shutting down the Tub it's using
190         # and starting a new Tub (with the old introducer). Everybody should
191         # reconnect and republish, but the introducer should ignore the
192         # republishes as duplicates. However, because the server doesn't know
193         # what each client does and does not know, it will send them a copy
194         # of the current announcement table anyway.
195
196         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
197         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
198
199         def _wait_for_introducer_loss():
200             for c in clients:
201                 if c.connected_to_introducer():
202                     return False
203             return True
204         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
205
206         def _restart_introducer_tub(_ign):
207             log.msg("restarting introducer's Tub")
208
209             # note: old.Server doesn't have this count
210             dc = introducer._debug_counts
211             self.expected_count = dc["inbound_message"] + NUM_SERVERS
212             self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1
213             introducer._debug0 = dc["outbound_message"]
214             for c in subscribing_clients:
215                 cdc = c._debug_counts
216                 c._debug0 = cdc["inbound_message"]
217
218             self.create_tub(self.central_portnum)
219             newfurl = self.central_tub.registerReference(introducer,
220                                                          furlFile=iff)
221             assert newfurl == self.introducer_furl
222         d.addCallback(_restart_introducer_tub)
223
224         def _wait_for_introducer_reconnect():
225             # wait until:
226             #  all clients are connected
227             #  the introducer has received publish messages from all of them
228             #  the introducer has received subscribe messages from all of them
229             #  the introducer has sent (duplicate) announcements to all of them
230             #  all clients have received (duplicate) announcements
231             dc = introducer._debug_counts
232             for c in clients:
233                 if not c.connected_to_introducer():
234                     return False
235             if dc["inbound_message"] < self.expected_count:
236                 return False
237             if dc["inbound_subscribe"] < self.expected_subscribe_count:
238                 return False
239             for c in subscribing_clients:
240                 cdc = c._debug_counts
241                 if cdc["inbound_message"] < c._debug0+1:
242                     return False
243             return True
244         d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
245
246         def _check2(res):
247             log.msg("doing _check2")
248             # assert that the introducer sent out new messages, one per
249             # subscriber
250             dc = introducer._debug_counts
251             self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS)
252             self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS)
253             self.failUnlessEqual(dc["inbound_update"], 0)
254             self.failUnlessEqual(dc["outbound_message"],
255                                  introducer._debug0 + len(subscribing_clients))
256             for c in clients:
257                 self.failUnless(c.connected_to_introducer())
258             for c in subscribing_clients:
259                 cdc = c._debug_counts
260                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS)
261         d.addCallback(_check2)
262
263         # Then force an introducer restart, by shutting down the Tub,
264         # destroying the old introducer, and starting a new Tub+Introducer.
265         # Everybody should reconnect and republish, and the (new) introducer
266         # will distribute the new announcements, but the clients should
267         # ignore the republishes as duplicates.
268
269         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
270         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
271         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
272
273         def _restart_introducer(_ign):
274             log.msg("restarting introducer")
275             self.create_tub(self.central_portnum)
276
277             for c in subscribing_clients:
278                 # record some counters for later comparison. Stash the values
279                 # on the client itself, because I'm lazy.
280                 cdc = c._debug_counts
281                 c._debug1 = cdc["inbound_announcement"]
282                 c._debug2 = cdc["inbound_message"]
283                 c._debug3 = cdc["new_announcement"]
284             newintroducer = create_introducer()
285             self.expected_message_count = NUM_SERVERS
286             self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients)
287             self.expected_subscribe_count = len(subscribing_clients)
288             newfurl = self.central_tub.registerReference(newintroducer,
289                                                          furlFile=iff)
290             assert newfurl == self.introducer_furl
291         d.addCallback(_restart_introducer)
292         def _wait_for_introducer_reconnect2():
293             # wait until:
294             #  all clients are connected
295             #  the introducer has received publish messages from all of them
296             #  the introducer has received subscribe messages from all of them
297             #  the introducer has sent announcements for everybody to everybody
298             #  all clients have received all the (duplicate) announcements
299             # at that point, the system should be quiescent
300             dc = introducer._debug_counts
301             for c in clients:
302                 if not c.connected_to_introducer():
303                     return False
304             if dc["inbound_message"] < self.expected_message_count:
305                 return False
306             if dc["outbound_announcements"] < self.expected_announcement_count:
307                 return False
308             if dc["inbound_subscribe"] < self.expected_subscribe_count:
309                 return False
310             for c in subscribing_clients:
311                 cdc = c._debug_counts
312                 if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
313                     return False
314             return True
315         d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
316
317         def _check3(res):
318             log.msg("doing _check3")
319             for c in clients:
320                 self.failUnless(c.connected_to_introducer())
321             for c in subscribing_clients:
322                 cdc = c._debug_counts
323                 self.failUnless(cdc["inbound_announcement"] > c._debug1)
324                 self.failUnless(cdc["inbound_message"] > c._debug2)
325                 # there should have been no new announcements
326                 self.failUnlessEqual(cdc["new_announcement"], c._debug3)
327                 # and the right number of duplicate ones. There were
328                 # NUM_SERVERS from the servertub restart, and there should be
329                 # another NUM_SERVERS now
330                 self.failUnlessEqual(cdc["duplicate_announcement"],
331                                      2*NUM_SERVERS)
332
333         d.addCallback(_check3)
334         return d
335
336 class TooNewServer(IntroducerService):
337     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
338                  { },
339                 "application-version": "greetings from the crazy future",
340                 }
341
342 class NonV1Server(SystemTestMixin, unittest.TestCase):
343     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
344     # protocol, it is supposed to provide a useful error instead of a weird
345     # exception.
346
347     def test_failure(self):
348         self.basedir = "introducer/NonV1Server/failure"
349         os.makedirs(self.basedir)
350         self.create_tub()
351         i = TooNewServer()
352         i.setServiceParent(self.parent)
353         self.introducer_furl = self.central_tub.registerReference(i)
354
355         tub = Tub()
356         tub.setOption("expose-remote-exception-types", False)
357         tub.setServiceParent(self.parent)
358         l = tub.listenOn("tcp:0")
359         portnum = l.getPortnum()
360         tub.setLocation("localhost:%d" % portnum)
361
362         c = IntroducerClient(tub, self.introducer_furl,
363                              u"nickname-client", "version", "oldest")
364         announcements = {}
365         def got(serverid, ann_d):
366             announcements[serverid] = ann_d
367         c.subscribe_to("storage", got)
368
369         c.setServiceParent(self.parent)
370
371         # now we wait for it to connect and notice the bad version
372
373         def _got_bad():
374             return bool(c._introducer_error) or bool(c._publisher)
375         d = self.poll(_got_bad)
376         def _done(res):
377             self.failUnless(c._introducer_error)
378             self.failUnless(c._introducer_error.check(InsufficientVersionError))
379         d.addCallback(_done)
380         return d
381
382 class Index(unittest.TestCase):
383     def test_make_index(self):
384         # make sure we have a working base64.b32decode. The one in
385         # python2.4.[01] was broken.
386         ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i',
387                'storage', 'RIStorageServer.tahoe.allmydata.com',
388                'plancha', 'allmydata-tahoe/1.4.1', '1.0.0')
389         (nodeid, service_name) = old.make_index(ann)
390         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
391         self.failUnlessEqual(service_name, "storage")
392