]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_introducer.py
hush pyflakes-0.4.0 warnings: remove trivial unused variables. For #900.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_introducer.py
1
2 import os, re
3 from base64 import b32decode
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import log
8
9 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
10 from twisted.application import service
11 from allmydata.interfaces import InsufficientVersionError
12 from allmydata.introducer.client import IntroducerClient
13 from allmydata.introducer.server import IntroducerService
14 # test compatibility with old introducer .tac files
15 from allmydata.introducer import IntroducerNode
16 from allmydata.util import pollmixin
17 import common_util as testutil
18
19 class LoggingMultiService(service.MultiService):
20     def log(self, msg, **kw):
21         log.msg(msg, **kw)
22
23 class Node(testutil.SignalMixin, unittest.TestCase):
24     def test_loadable(self):
25         basedir = "introducer.IntroducerNode.test_loadable"
26         os.mkdir(basedir)
27         q = IntroducerNode(basedir)
28         d = fireEventually(None)
29         d.addCallback(lambda res: q.startService())
30         d.addCallback(lambda res: q.when_tub_ready())
31         d.addCallback(lambda res: q.stopService())
32         d.addCallback(flushEventualQueue)
33         return d
34
35 class ServiceMixin:
36     def setUp(self):
37         self.parent = LoggingMultiService()
38         self.parent.startService()
39     def tearDown(self):
40         log.msg("TestIntroducer.tearDown")
41         d = defer.succeed(None)
42         d.addCallback(lambda res: self.parent.stopService())
43         d.addCallback(flushEventualQueue)
44         return d
45
46 class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
47
48     def test_create(self):
49         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
50                               "my_version", "oldest_version")
51
52     def test_listen(self):
53         i = IntroducerService()
54         i.setServiceParent(self.parent)
55
56     def test_duplicate(self):
57         i = IntroducerService()
58         self.failUnlessEqual(len(i.get_announcements()), 0)
59         self.failUnlessEqual(len(i.get_subscribers()), 0)
60         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
61         furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
62         ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
63         ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
64         ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
65         i.remote_publish(ann1)
66         self.failUnlessEqual(len(i.get_announcements()), 1)
67         self.failUnlessEqual(len(i.get_subscribers()), 0)
68         i.remote_publish(ann2)
69         self.failUnlessEqual(len(i.get_announcements()), 2)
70         self.failUnlessEqual(len(i.get_subscribers()), 0)
71         i.remote_publish(ann1b)
72         self.failUnlessEqual(len(i.get_announcements()), 2)
73         self.failUnlessEqual(len(i.get_subscribers()), 0)
74
75 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
76
77     def create_tub(self, portnum=0):
78         tubfile = os.path.join(self.basedir, "tub.pem")
79         self.central_tub = tub = Tub(certFile=tubfile)
80         #tub.setOption("logLocalFailures", True)
81         #tub.setOption("logRemoteFailures", True)
82         tub.setOption("expose-remote-exception-types", False)
83         tub.setServiceParent(self.parent)
84         l = tub.listenOn("tcp:%d" % portnum)
85         self.central_portnum = l.getPortnum()
86         if portnum != 0:
87             assert self.central_portnum == portnum
88         tub.setLocation("localhost:%d" % self.central_portnum)
89
90 class SystemTest(SystemTestMixin, unittest.TestCase):
91
92     def test_system(self):
93         self.basedir = "introducer/SystemTest/system"
94         os.makedirs(self.basedir)
95         return self.do_system_test(IntroducerService)
96     test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
97
98     def do_system_test(self, create_introducer):
99         self.create_tub()
100         introducer = create_introducer()
101         introducer.setServiceParent(self.parent)
102         iff = os.path.join(self.basedir, "introducer.furl")
103         tub = self.central_tub
104         ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
105         self.introducer_furl = ifurl
106
107         NUMCLIENTS = 5
108         # we have 5 clients who publish themselves, and an extra one does
109         # which not. When the connections are fully established, all six nodes
110         # should have 5 connections each.
111
112         clients = []
113         tubs = {}
114         received_announcements = {}
115         NUM_SERVERS = NUMCLIENTS
116         subscribing_clients = []
117         publishing_clients = []
118
119         for i in range(NUMCLIENTS+1):
120             tub = Tub()
121             #tub.setOption("logLocalFailures", True)
122             #tub.setOption("logRemoteFailures", True)
123             tub.setOption("expose-remote-exception-types", False)
124             tub.setServiceParent(self.parent)
125             l = tub.listenOn("tcp:0")
126             portnum = l.getPortnum()
127             tub.setLocation("localhost:%d" % portnum)
128
129             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
130             c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i,
131                                  "version", "oldest")
132             received_announcements[c] = {}
133             def got(serverid, ann_d, announcements):
134                 announcements[serverid] = ann_d
135             c.subscribe_to("storage", got, received_announcements[c])
136             subscribing_clients.append(c)
137
138             if i < NUMCLIENTS:
139                 node_furl = tub.registerReference(Referenceable())
140                 c.publish(node_furl, "storage", "ri_name")
141                 publishing_clients.append(c)
142             # the last one does not publish anything
143
144             c.setServiceParent(self.parent)
145             clients.append(c)
146             tubs[c] = tub
147
148         def _wait_for_all_connections():
149             for c in subscribing_clients:
150                 if len(received_announcements[c]) < NUM_SERVERS:
151                     return False
152             return True
153         d = self.poll(_wait_for_all_connections)
154
155         def _check1(res):
156             log.msg("doing _check1")
157             dc = introducer._debug_counts
158             self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS)
159             self.failUnlessEqual(dc["inbound_duplicate"], 0)
160             self.failUnlessEqual(dc["inbound_update"], 0)
161             self.failUnless(dc["outbound_message"])
162
163             for c in clients:
164                 self.failUnless(c.connected_to_introducer())
165             for c in subscribing_clients:
166                 cdc = c._debug_counts
167                 self.failUnless(cdc["inbound_message"])
168                 self.failUnlessEqual(cdc["inbound_announcement"],
169                                      NUM_SERVERS)
170                 self.failUnlessEqual(cdc["wrong_service"], 0)
171                 self.failUnlessEqual(cdc["duplicate_announcement"], 0)
172                 self.failUnlessEqual(cdc["update"], 0)
173                 self.failUnlessEqual(cdc["new_announcement"],
174                                      NUM_SERVERS)
175                 anns = received_announcements[c]
176                 self.failUnlessEqual(len(anns), NUM_SERVERS)
177
178                 nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
179                 ann_d = anns[nodeid0]
180                 nick = ann_d["nickname"]
181                 self.failUnlessEqual(type(nick), unicode)
182                 self.failUnlessEqual(nick, u"nickname-0")
183             for c in publishing_clients:
184                 cdc = c._debug_counts
185                 self.failUnlessEqual(cdc["outbound_message"], 1)
186         d.addCallback(_check1)
187
188         # force an introducer reconnect, by shutting down the Tub it's using
189         # and starting a new Tub (with the old introducer). Everybody should
190         # reconnect and republish, but the introducer should ignore the
191         # republishes as duplicates. However, because the server doesn't know
192         # what each client does and does not know, it will send them a copy
193         # of the current announcement table anyway.
194
195         d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
196         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
197
198         def _wait_for_introducer_loss():
199             for c in clients:
200                 if c.connected_to_introducer():
201                     return False
202             return True
203         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
204
205         def _restart_introducer_tub(_ign):
206             log.msg("restarting introducer's Tub")
207
208             dc = introducer._debug_counts
209             self.expected_count = dc["inbound_message"] + NUM_SERVERS
210             self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1
211             introducer._debug0 = dc["outbound_message"]
212             for c in subscribing_clients:
213                 cdc = c._debug_counts
214                 c._debug0 = cdc["inbound_message"]
215
216             self.create_tub(self.central_portnum)
217             newfurl = self.central_tub.registerReference(introducer,
218                                                          furlFile=iff)
219             assert newfurl == self.introducer_furl
220         d.addCallback(_restart_introducer_tub)
221
222         def _wait_for_introducer_reconnect():
223             # wait until:
224             #  all clients are connected
225             #  the introducer has received publish messages from all of them
226             #  the introducer has received subscribe messages from all of them
227             #  the introducer has sent (duplicate) announcements to all of them
228             #  all clients have received (duplicate) announcements
229             dc = introducer._debug_counts
230             for c in clients:
231                 if not c.connected_to_introducer():
232                     return False
233             if dc["inbound_message"] < self.expected_count:
234                 return False
235             if dc["inbound_subscribe"] < self.expected_subscribe_count:
236                 return False
237             for c in subscribing_clients:
238                 cdc = c._debug_counts
239                 if cdc["inbound_message"] < c._debug0+1:
240                     return False
241             return True
242         d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
243
244         def _check2(res):
245             log.msg("doing _check2")
246             # assert that the introducer sent out new messages, one per
247             # subscriber
248             dc = introducer._debug_counts
249             self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS)
250             self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS)
251             self.failUnlessEqual(dc["inbound_update"], 0)
252             self.failUnlessEqual(dc["outbound_message"],
253                                  introducer._debug0 + len(subscribing_clients))
254             for c in clients:
255                 self.failUnless(c.connected_to_introducer())
256             for c in subscribing_clients:
257                 cdc = c._debug_counts
258                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS)
259         d.addCallback(_check2)
260
261         # Then force an introducer restart, by shutting down the Tub,
262         # destroying the old introducer, and starting a new Tub+Introducer.
263         # Everybody should reconnect and republish, and the (new) introducer
264         # will distribute the new announcements, but the clients should
265         # ignore the republishes as duplicates.
266
267         d.addCallback(lambda _ign: log.msg("shutting down introducer"))
268         d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
269         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
270
271         def _restart_introducer(_ign):
272             log.msg("restarting introducer")
273             self.create_tub(self.central_portnum)
274
275             for c in subscribing_clients:
276                 # record some counters for later comparison. Stash the values
277                 # on the client itself, because I'm lazy.
278                 cdc = c._debug_counts
279                 c._debug1 = cdc["inbound_announcement"]
280                 c._debug2 = cdc["inbound_message"]
281                 c._debug3 = cdc["new_announcement"]
282             newintroducer = create_introducer()
283             self.expected_message_count = NUM_SERVERS
284             self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients)
285             self.expected_subscribe_count = len(subscribing_clients)
286             newfurl = self.central_tub.registerReference(newintroducer,
287                                                          furlFile=iff)
288             assert newfurl == self.introducer_furl
289         d.addCallback(_restart_introducer)
290         def _wait_for_introducer_reconnect2():
291             # wait until:
292             #  all clients are connected
293             #  the introducer has received publish messages from all of them
294             #  the introducer has received subscribe messages from all of them
295             #  the introducer has sent announcements for everybody to everybody
296             #  all clients have received all the (duplicate) announcements
297             # at that point, the system should be quiescent
298             dc = introducer._debug_counts
299             for c in clients:
300                 if not c.connected_to_introducer():
301                     return False
302             if dc["inbound_message"] < self.expected_message_count:
303                 return False
304             if dc["outbound_announcements"] < self.expected_announcement_count:
305                 return False
306             if dc["inbound_subscribe"] < self.expected_subscribe_count:
307                 return False
308             for c in subscribing_clients:
309                 cdc = c._debug_counts
310                 if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
311                     return False
312             return True
313         d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
314
315         def _check3(res):
316             log.msg("doing _check3")
317             for c in clients:
318                 self.failUnless(c.connected_to_introducer())
319             for c in subscribing_clients:
320                 cdc = c._debug_counts
321                 self.failUnless(cdc["inbound_announcement"] > c._debug1)
322                 self.failUnless(cdc["inbound_message"] > c._debug2)
323                 # there should have been no new announcements
324                 self.failUnlessEqual(cdc["new_announcement"], c._debug3)
325                 # and the right number of duplicate ones. There were
326                 # NUM_SERVERS from the servertub restart, and there should be
327                 # another NUM_SERVERS now
328                 self.failUnlessEqual(cdc["duplicate_announcement"],
329                                      2*NUM_SERVERS)
330
331         d.addCallback(_check3)
332         return d
333
334 class TooNewServer(IntroducerService):
335     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
336                  { },
337                 "application-version": "greetings from the crazy future",
338                 }
339
340 class NonV1Server(SystemTestMixin, unittest.TestCase):
341     # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
342     # protocol, it is supposed to provide a useful error instead of a weird
343     # exception.
344
345     def test_failure(self):
346         self.basedir = "introducer/NonV1Server/failure"
347         os.makedirs(self.basedir)
348         self.create_tub()
349         i = TooNewServer()
350         i.setServiceParent(self.parent)
351         self.introducer_furl = self.central_tub.registerReference(i)
352
353         tub = Tub()
354         tub.setOption("expose-remote-exception-types", False)
355         tub.setServiceParent(self.parent)
356         l = tub.listenOn("tcp:0")
357         portnum = l.getPortnum()
358         tub.setLocation("localhost:%d" % portnum)
359
360         c = IntroducerClient(tub, self.introducer_furl,
361                              u"nickname-client", "version", "oldest")
362         announcements = {}
363         def got(serverid, ann_d):
364             announcements[serverid] = ann_d
365         c.subscribe_to("storage", got)
366
367         c.setServiceParent(self.parent)
368
369         # now we wait for it to connect and notice the bad version
370
371         def _got_bad():
372             return bool(c._introducer_error) or bool(c._publisher)
373         d = self.poll(_got_bad)
374         def _done(res):
375             self.failUnless(c._introducer_error)
376             self.failUnless(c._introducer_error.check(InsufficientVersionError))
377         d.addCallback(_done)
378         return d
379
380 class DecodeFurl(unittest.TestCase):
381     def test_decode(self):
382         # make sure we have a working base64.b32decode. The one in
383         # python2.4.[01] was broken.
384         furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i'
385         m = re.match(r'pb://(\w+)@', furl)
386         assert m
387         nodeid = b32decode(m.group(1).upper())
388         self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
389