]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_node.py
ef1abe0653da248c7161f5e69e096c25cb7135b4
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_node.py
1
2 import os, stat, sys, time
3 from twisted.trial import unittest
4 from twisted.internet import defer
5 from twisted.python import log
6
7 from mock import patch
8
9 from foolscap.api import flushEventualQueue
10 from twisted.application import service
11 from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
12 from allmydata.util import fileutil, iputil
13 import allmydata.test.common_util as testutil
14
15 class LoggingMultiService(service.MultiService):
16     def log(self, msg, **kw):
17         pass
18
19 class TestNode(Node):
20     CERTFILE='DEFAULT_CERTFILE_BLANK'
21     PORTNUMFILE='DEFAULT_PORTNUMFILE_BLANK'
22
23 class TestCase(testutil.SignalMixin, unittest.TestCase):
24     def setUp(self):
25         testutil.SignalMixin.setUp(self)
26         self.parent = LoggingMultiService()
27         self.parent.startService()
28     def tearDown(self):
29         log.msg("%s.tearDown" % self.__class__.__name__)
30         testutil.SignalMixin.tearDown(self)
31         d = defer.succeed(None)
32         d.addCallback(lambda res: self.parent.stopService())
33         d.addCallback(flushEventualQueue)
34         return d
35
36     def _test_location(self, basedir, expected_addresses, tub_port=None, tub_location=None, local_addresses=None):
37         fileutil.make_dirs(basedir)
38         f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
39         f.write("[node]\n")
40         if tub_port:
41             f.write("tub.port = %d\n" % (tub_port,))
42         if tub_location is not None:
43             f.write("tub.location = %s\n" % (tub_location,))
44         f.close()
45
46         if local_addresses:
47             self.patch(iputil, 'get_local_addresses_async', lambda target=None: defer.succeed(local_addresses))
48
49         n = TestNode(basedir)
50         n.setServiceParent(self.parent)
51         d = n.when_tub_ready()
52
53         def _check_addresses(ignored_result):
54             furl = n.tub.registerReference(n)
55             for address in expected_addresses:
56                 self.failUnlessIn(address, furl)
57
58         d.addCallback(_check_addresses)
59         return d
60
61     def test_location1(self):
62         return self._test_location(basedir="test_node/test_location1",
63                                    expected_addresses=["192.0.2.0:1234"],
64                                    tub_location="192.0.2.0:1234")
65
66     def test_location2(self):
67         return self._test_location(basedir="test_node/test_location2",
68                                    expected_addresses=["192.0.2.0:1234", "example.org:8091"],
69                                    tub_location="192.0.2.0:1234,example.org:8091")
70
71     def test_location_not_set(self):
72         """Checks the autogenerated furl when tub.location is not set."""
73         return self._test_location(basedir="test_node/test_location3",
74                                    expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234"],
75                                    tub_port=1234,
76                                    local_addresses=["127.0.0.1", "192.0.2.0"])
77
78     def test_location_auto_and_explicit(self):
79         """Checks the autogenerated furl when tub.location contains 'AUTO'."""
80         return self._test_location(basedir="test_node/test_location4",
81                                    expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234", "example.com:4321"],
82                                    tub_port=1234,
83                                    tub_location="AUTO,example.com:4321",
84                                    local_addresses=["127.0.0.1", "192.0.2.0", "example.com:4321"])
85
86     def test_tahoe_cfg_utf8(self):
87         basedir = "test_node/test_tahoe_cfg_utf8"
88         fileutil.make_dirs(basedir)
89         f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
90         f.write(u"\uFEFF[node]\n".encode('utf-8'))
91         f.write(u"nickname = \u2621\n".encode('utf-8'))
92         f.close()
93
94         n = TestNode(basedir)
95         n.setServiceParent(self.parent)
96         d = n.when_tub_ready()
97         d.addCallback(lambda ign: self.failUnlessEqual(n.get_config("node", "nickname").decode('utf-8'),
98                                                        u"\u2621"))
99         return d
100
101     def test_tahoe_cfg_hash_in_name(self):
102         basedir = "test_node/test_cfg_hash_in_name"
103         nickname = "Hash#Bang!" # a clever nickname containing a hash
104         fileutil.make_dirs(basedir)
105         f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
106         f.write("[node]\n")
107         f.write("nickname = %s\n" % (nickname,))
108         f.close()
109         n = TestNode(basedir)
110         self.failUnless(n.nickname == nickname)
111
112     def test_private_config(self):
113         basedir = "test_node/test_private_config"
114         privdir = os.path.join(basedir, "private")
115         fileutil.make_dirs(privdir)
116         f = open(os.path.join(privdir, 'already'), 'wt')
117         f.write("secret")
118         f.close()
119
120         n = TestNode(basedir)
121         self.failUnlessEqual(n.get_private_config("already"), "secret")
122         self.failUnlessEqual(n.get_private_config("not", "default"), "default")
123         self.failUnlessRaises(MissingConfigEntry, n.get_private_config, "not")
124         value = n.get_or_create_private_config("new", "start")
125         self.failUnlessEqual(value, "start")
126         self.failUnlessEqual(n.get_private_config("new"), "start")
127         counter = []
128         def make_newer():
129             counter.append("called")
130             return "newer"
131         value = n.get_or_create_private_config("newer", make_newer)
132         self.failUnlessEqual(len(counter), 1)
133         self.failUnlessEqual(value, "newer")
134         self.failUnlessEqual(n.get_private_config("newer"), "newer")
135
136         value = n.get_or_create_private_config("newer", make_newer)
137         self.failUnlessEqual(len(counter), 1) # don't call unless necessary
138         self.failUnlessEqual(value, "newer")
139
140     def test_timestamp(self):
141         # this modified logger doesn't seem to get used during the tests,
142         # probably because we don't modify the LogObserver that trial
143         # installs (only the one that twistd installs). So manually exercise
144         # it a little bit.
145         t = formatTimeTahoeStyle("ignored", time.time())
146         self.failUnless("Z" in t)
147         t2 = formatTimeTahoeStyle("ignored", int(time.time()))
148         self.failUnless("Z" in t2)
149
150     def test_secrets_dir(self):
151         basedir = "test_node/test_secrets_dir"
152         fileutil.make_dirs(basedir)
153         n = TestNode(basedir)
154         self.failUnless(isinstance(n, TestNode))
155         self.failUnless(os.path.exists(os.path.join(basedir, "private")))
156
157     def test_secrets_dir_protected(self):
158         if "win32" in sys.platform.lower() or "cygwin" in sys.platform.lower():
159             # We don't know how to test that unprivileged users can't read this
160             # thing.  (Also we don't know exactly how to set the permissions so
161             # that unprivileged users can't read this thing.)
162             raise unittest.SkipTest("We don't know how to set permissions on Windows.")
163         basedir = "test_node/test_secrets_dir_protected"
164         fileutil.make_dirs(basedir)
165         n = TestNode(basedir)
166         self.failUnless(isinstance(n, TestNode))
167         privdir = os.path.join(basedir, "private")
168         st = os.stat(privdir)
169         bits = stat.S_IMODE(st[stat.ST_MODE])
170         self.failUnless(bits & 0001 == 0, bits)
171
172     @patch("foolscap.logging.log.setLogDir")
173     def test_logdir_is_str(self, mock_setLogDir):
174         basedir = "test_node/test_logdir_is_str"
175         fileutil.make_dirs(basedir)
176
177         def call_setLogDir(logdir):
178             self.failUnless(isinstance(logdir, str), logdir)
179         mock_setLogDir.side_effect = call_setLogDir
180
181         TestNode(basedir)
182         self.failUnless(mock_setLogDir.called)