]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_node.py
Eliminate mock dependency.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_node.py
1
2 import os, stat, sys, time
3
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.python import log
7
8 from foolscap.api import flushEventualQueue
9 import foolscap.logging.log
10
11 from twisted.application import service
12 from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
13 from allmydata.util import fileutil, iputil
14 from allmydata.util.namespace import Namespace
15 import allmydata.test.common_util as testutil
16
17
18 class LoggingMultiService(service.MultiService):
19     def log(self, msg, **kw):
20         pass
21
22 class TestNode(Node):
23     CERTFILE='DEFAULT_CERTFILE_BLANK'
24     PORTNUMFILE='DEFAULT_PORTNUMFILE_BLANK'
25
26 class TestCase(testutil.SignalMixin, unittest.TestCase):
27     def setUp(self):
28         testutil.SignalMixin.setUp(self)
29         self.parent = LoggingMultiService()
30         self.parent.startService()
31     def tearDown(self):
32         log.msg("%s.tearDown" % self.__class__.__name__)
33         testutil.SignalMixin.tearDown(self)
34         d = defer.succeed(None)
35         d.addCallback(lambda res: self.parent.stopService())
36         d.addCallback(flushEventualQueue)
37         return d
38
39     def _test_location(self, basedir, expected_addresses, tub_port=None, tub_location=None, local_addresses=None):
40         fileutil.make_dirs(basedir)
41         f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
42         f.write("[node]\n")
43         if tub_port:
44             f.write("tub.port = %d\n" % (tub_port,))
45         if tub_location is not None:
46             f.write("tub.location = %s\n" % (tub_location,))
47         f.close()
48
49         if local_addresses:
50             self.patch(iputil, 'get_local_addresses_async', lambda target=None: defer.succeed(local_addresses))
51
52         n = TestNode(basedir)
53         n.setServiceParent(self.parent)
54         d = n.when_tub_ready()
55
56         def _check_addresses(ignored_result):
57             furl = n.tub.registerReference(n)
58             for address in expected_addresses:
59                 self.failUnlessIn(address, furl)
60
61         d.addCallback(_check_addresses)
62         return d
63
64     def test_location1(self):
65         return self._test_location(basedir="test_node/test_location1",
66                                    expected_addresses=["192.0.2.0:1234"],
67                                    tub_location="192.0.2.0:1234")
68
69     def test_location2(self):
70         return self._test_location(basedir="test_node/test_location2",
71                                    expected_addresses=["192.0.2.0:1234", "example.org:8091"],
72                                    tub_location="192.0.2.0:1234,example.org:8091")
73
74     def test_location_not_set(self):
75         """Checks the autogenerated furl when tub.location is not set."""
76         return self._test_location(basedir="test_node/test_location3",
77                                    expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234"],
78                                    tub_port=1234,
79                                    local_addresses=["127.0.0.1", "192.0.2.0"])
80
81     def test_location_auto_and_explicit(self):
82         """Checks the autogenerated furl when tub.location contains 'AUTO'."""
83         return self._test_location(basedir="test_node/test_location4",
84                                    expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234", "example.com:4321"],
85                                    tub_port=1234,
86                                    tub_location="AUTO,example.com:4321",
87                                    local_addresses=["127.0.0.1", "192.0.2.0", "example.com:4321"])
88
89     def test_tahoe_cfg_utf8(self):
90         basedir = "test_node/test_tahoe_cfg_utf8"
91         fileutil.make_dirs(basedir)
92         f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
93         f.write(u"\uFEFF[node]\n".encode('utf-8'))
94         f.write(u"nickname = \u2621\n".encode('utf-8'))
95         f.close()
96
97         n = TestNode(basedir)
98         n.setServiceParent(self.parent)
99         d = n.when_tub_ready()
100         d.addCallback(lambda ign: self.failUnlessEqual(n.get_config("node", "nickname").decode('utf-8'),
101                                                        u"\u2621"))
102         return d
103
104     def test_tahoe_cfg_hash_in_name(self):
105         basedir = "test_node/test_cfg_hash_in_name"
106         nickname = "Hash#Bang!" # a clever nickname containing a hash
107         fileutil.make_dirs(basedir)
108         f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
109         f.write("[node]\n")
110         f.write("nickname = %s\n" % (nickname,))
111         f.close()
112         n = TestNode(basedir)
113         self.failUnless(n.nickname == nickname)
114
115     def test_private_config(self):
116         basedir = "test_node/test_private_config"
117         privdir = os.path.join(basedir, "private")
118         fileutil.make_dirs(privdir)
119         f = open(os.path.join(privdir, 'already'), 'wt')
120         f.write("secret")
121         f.close()
122
123         n = TestNode(basedir)
124         self.failUnlessEqual(n.get_private_config("already"), "secret")
125         self.failUnlessEqual(n.get_private_config("not", "default"), "default")
126         self.failUnlessRaises(MissingConfigEntry, n.get_private_config, "not")
127         value = n.get_or_create_private_config("new", "start")
128         self.failUnlessEqual(value, "start")
129         self.failUnlessEqual(n.get_private_config("new"), "start")
130         counter = []
131         def make_newer():
132             counter.append("called")
133             return "newer"
134         value = n.get_or_create_private_config("newer", make_newer)
135         self.failUnlessEqual(len(counter), 1)
136         self.failUnlessEqual(value, "newer")
137         self.failUnlessEqual(n.get_private_config("newer"), "newer")
138
139         value = n.get_or_create_private_config("newer", make_newer)
140         self.failUnlessEqual(len(counter), 1) # don't call unless necessary
141         self.failUnlessEqual(value, "newer")
142
143     def test_timestamp(self):
144         # this modified logger doesn't seem to get used during the tests,
145         # probably because we don't modify the LogObserver that trial
146         # installs (only the one that twistd installs). So manually exercise
147         # it a little bit.
148         t = formatTimeTahoeStyle("ignored", time.time())
149         self.failUnless("Z" in t)
150         t2 = formatTimeTahoeStyle("ignored", int(time.time()))
151         self.failUnless("Z" in t2)
152
153     def test_secrets_dir(self):
154         basedir = "test_node/test_secrets_dir"
155         fileutil.make_dirs(basedir)
156         n = TestNode(basedir)
157         self.failUnless(isinstance(n, TestNode))
158         self.failUnless(os.path.exists(os.path.join(basedir, "private")))
159
160     def test_secrets_dir_protected(self):
161         if "win32" in sys.platform.lower() or "cygwin" in sys.platform.lower():
162             # We don't know how to test that unprivileged users can't read this
163             # thing.  (Also we don't know exactly how to set the permissions so
164             # that unprivileged users can't read this thing.)
165             raise unittest.SkipTest("We don't know how to set permissions on Windows.")
166         basedir = "test_node/test_secrets_dir_protected"
167         fileutil.make_dirs(basedir)
168         n = TestNode(basedir)
169         self.failUnless(isinstance(n, TestNode))
170         privdir = os.path.join(basedir, "private")
171         st = os.stat(privdir)
172         bits = stat.S_IMODE(st[stat.ST_MODE])
173         self.failUnless(bits & 0001 == 0, bits)
174
175     def test_logdir_is_str(self):
176         basedir = "test_node/test_logdir_is_str"
177         fileutil.make_dirs(basedir)
178
179         ns = Namespace()
180         ns.called = False
181         def call_setLogDir(logdir):
182             ns.called = True
183             self.failUnless(isinstance(logdir, str), logdir)
184         self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir)
185
186         TestNode(basedir)
187         self.failUnless(ns.called)