]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_node.py
test_node.py: refactor test_location* to reduce code duplication. refs #754
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_node.py
1
2 import os, stat, sys, time
3 from twisted.trial import unittest
4 from twisted.internet import defer
5 from twisted.python import log
6
7 from mock import patch
8
9 from foolscap.api import flushEventualQueue
10 from twisted.application import service
11 from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
12 from allmydata.util import fileutil
13 import allmydata.test.common_util as testutil
14
15 class LoggingMultiService(service.MultiService):
16     def log(self, msg, **kw):
17         pass
18
19 class TestNode(Node):
20     CERTFILE='DEFAULT_CERTFILE_BLANK'
21     PORTNUMFILE='DEFAULT_PORTNUMFILE_BLANK'
22
23 class TestCase(testutil.SignalMixin, unittest.TestCase):
24     def setUp(self):
25         testutil.SignalMixin.setUp(self)
26         self.parent = LoggingMultiService()
27         self.parent.startService()
28     def tearDown(self):
29         log.msg("%s.tearDown" % self.__class__.__name__)
30         testutil.SignalMixin.tearDown(self)
31         d = defer.succeed(None)
32         d.addCallback(lambda res: self.parent.stopService())
33         d.addCallback(flushEventualQueue)
34         return d
35
36     def _test_location(self, basedir, expected_addresses, tub_port=None, tub_location=None):
37         fileutil.make_dirs(basedir)
38         f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
39         f.write("[node]\n")
40         if tub_port:
41             f.write("tub.port = %d\n" % (tub_port,))
42         if tub_location is not None:
43             f.write("tub.location = %s\n" % (tub_location,))
44         f.close()
45
46         n = TestNode(basedir)
47         n.setServiceParent(self.parent)
48         d = n.when_tub_ready()
49
50         def _check_addresses(ignored_result):
51             furl = n.tub.registerReference(n)
52             for address in expected_addresses:
53                 self.failUnlessIn(address, furl)
54
55         d.addCallback(_check_addresses)
56         return d
57
58     def test_location1(self):
59         return self._test_location(basedir="test_node/test_location1",
60                                    expected_addresses=["192.0.2.0:1234"],
61                                    tub_location="192.0.2.0:1234")
62
63     def test_location2(self):
64         return self._test_location(basedir="test_node/test_location2",
65                                    expected_addresses=["192.0.2.0:1234", "example.org:8091"],
66                                    tub_location="192.0.2.0:1234,example.org:8091")
67
68
69     def test_tahoe_cfg_utf8(self):
70         basedir = "test_node/test_tahoe_cfg_utf8"
71         fileutil.make_dirs(basedir)
72         f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
73         f.write(u"\uFEFF[node]\n".encode('utf-8'))
74         f.write(u"nickname = \u2621\n".encode('utf-8'))
75         f.close()
76
77         n = TestNode(basedir)
78         n.setServiceParent(self.parent)
79         d = n.when_tub_ready()
80         d.addCallback(lambda ign: self.failUnlessEqual(n.get_config("node", "nickname").decode('utf-8'),
81                                                        u"\u2621"))
82         return d
83
84     def test_tahoe_cfg_hash_in_name(self):
85         basedir = "test_node/test_cfg_hash_in_name"
86         nickname = "Hash#Bang!" # a clever nickname containing a hash
87         fileutil.make_dirs(basedir)
88         f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
89         f.write("[node]\n")
90         f.write("nickname = %s\n" % (nickname,))
91         f.close()
92         n = TestNode(basedir)
93         self.failUnless(n.nickname == nickname)
94
95     def test_private_config(self):
96         basedir = "test_node/test_private_config"
97         privdir = os.path.join(basedir, "private")
98         fileutil.make_dirs(privdir)
99         f = open(os.path.join(privdir, 'already'), 'wt')
100         f.write("secret")
101         f.close()
102
103         n = TestNode(basedir)
104         self.failUnlessEqual(n.get_private_config("already"), "secret")
105         self.failUnlessEqual(n.get_private_config("not", "default"), "default")
106         self.failUnlessRaises(MissingConfigEntry, n.get_private_config, "not")
107         value = n.get_or_create_private_config("new", "start")
108         self.failUnlessEqual(value, "start")
109         self.failUnlessEqual(n.get_private_config("new"), "start")
110         counter = []
111         def make_newer():
112             counter.append("called")
113             return "newer"
114         value = n.get_or_create_private_config("newer", make_newer)
115         self.failUnlessEqual(len(counter), 1)
116         self.failUnlessEqual(value, "newer")
117         self.failUnlessEqual(n.get_private_config("newer"), "newer")
118
119         value = n.get_or_create_private_config("newer", make_newer)
120         self.failUnlessEqual(len(counter), 1) # don't call unless necessary
121         self.failUnlessEqual(value, "newer")
122
123     def test_timestamp(self):
124         # this modified logger doesn't seem to get used during the tests,
125         # probably because we don't modify the LogObserver that trial
126         # installs (only the one that twistd installs). So manually exercise
127         # it a little bit.
128         t = formatTimeTahoeStyle("ignored", time.time())
129         self.failUnless("Z" in t)
130         t2 = formatTimeTahoeStyle("ignored", int(time.time()))
131         self.failUnless("Z" in t2)
132
133     def test_secrets_dir(self):
134         basedir = "test_node/test_secrets_dir"
135         fileutil.make_dirs(basedir)
136         n = TestNode(basedir)
137         self.failUnless(isinstance(n, TestNode))
138         self.failUnless(os.path.exists(os.path.join(basedir, "private")))
139
140     def test_secrets_dir_protected(self):
141         if "win32" in sys.platform.lower() or "cygwin" in sys.platform.lower():
142             # We don't know how to test that unprivileged users can't read this
143             # thing.  (Also we don't know exactly how to set the permissions so
144             # that unprivileged users can't read this thing.)
145             raise unittest.SkipTest("We don't know how to set permissions on Windows.")
146         basedir = "test_node/test_secrets_dir_protected"
147         fileutil.make_dirs(basedir)
148         n = TestNode(basedir)
149         self.failUnless(isinstance(n, TestNode))
150         privdir = os.path.join(basedir, "private")
151         st = os.stat(privdir)
152         bits = stat.S_IMODE(st[stat.ST_MODE])
153         self.failUnless(bits & 0001 == 0, bits)
154
155     @patch("foolscap.logging.log.setLogDir")
156     def test_logdir_is_str(self, mock_setLogDir):
157         basedir = "test_node/test_logdir_is_str"
158         fileutil.make_dirs(basedir)
159
160         def call_setLogDir(logdir):
161             self.failUnless(isinstance(logdir, str), logdir)
162         mock_setLogDir.side_effect = call_setLogDir
163
164         TestNode(basedir)
165         self.failUnless(mock_setLogDir.called)