2 import os, stat, sys, time
3 from twisted.trial import unittest
4 from twisted.internet import defer
5 from twisted.python import log
9 from foolscap.api import flushEventualQueue
10 from twisted.application import service
11 from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
12 from allmydata.util import fileutil, iputil
13 import allmydata.test.common_util as testutil
15 class LoggingMultiService(service.MultiService):
16 def log(self, msg, **kw):
20 CERTFILE='DEFAULT_CERTFILE_BLANK'
21 PORTNUMFILE='DEFAULT_PORTNUMFILE_BLANK'
23 class TestCase(testutil.SignalMixin, unittest.TestCase):
25 testutil.SignalMixin.setUp(self)
26 self.parent = LoggingMultiService()
27 self.parent.startService()
29 log.msg("%s.tearDown" % self.__class__.__name__)
30 testutil.SignalMixin.tearDown(self)
31 d = defer.succeed(None)
32 d.addCallback(lambda res: self.parent.stopService())
33 d.addCallback(flushEventualQueue)
36 def _test_location(self, basedir, expected_addresses, tub_port=None, tub_location=None, local_addresses=None):
37 fileutil.make_dirs(basedir)
38 f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
41 f.write("tub.port = %d\n" % (tub_port,))
42 if tub_location is not None:
43 f.write("tub.location = %s\n" % (tub_location,))
47 self.patch(iputil, 'get_local_addresses_async', lambda target=None: defer.succeed(local_addresses))
50 n.setServiceParent(self.parent)
51 d = n.when_tub_ready()
53 def _check_addresses(ignored_result):
54 furl = n.tub.registerReference(n)
55 for address in expected_addresses:
56 self.failUnlessIn(address, furl)
58 d.addCallback(_check_addresses)
61 def test_location1(self):
62 return self._test_location(basedir="test_node/test_location1",
63 expected_addresses=["192.0.2.0:1234"],
64 tub_location="192.0.2.0:1234")
66 def test_location2(self):
67 return self._test_location(basedir="test_node/test_location2",
68 expected_addresses=["192.0.2.0:1234", "example.org:8091"],
69 tub_location="192.0.2.0:1234,example.org:8091")
71 def test_location_not_set(self):
72 """Checks the autogenerated furl when tub.location is not set."""
73 return self._test_location(basedir="test_node/test_location3",
74 expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234"],
76 local_addresses=["127.0.0.1", "192.0.2.0"])
78 def test_location_auto_and_explicit(self):
79 """Checks the autogenerated furl when tub.location contains 'AUTO'."""
80 return self._test_location(basedir="test_node/test_location4",
81 expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234", "example.com:4321"],
83 tub_location="AUTO,example.com:4321",
84 local_addresses=["127.0.0.1", "192.0.2.0", "example.com:4321"])
86 def test_tahoe_cfg_utf8(self):
87 basedir = "test_node/test_tahoe_cfg_utf8"
88 fileutil.make_dirs(basedir)
89 f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
90 f.write(u"\uFEFF[node]\n".encode('utf-8'))
91 f.write(u"nickname = \u2621\n".encode('utf-8'))
95 n.setServiceParent(self.parent)
96 d = n.when_tub_ready()
97 d.addCallback(lambda ign: self.failUnlessEqual(n.get_config("node", "nickname").decode('utf-8'),
101 def test_tahoe_cfg_hash_in_name(self):
102 basedir = "test_node/test_cfg_hash_in_name"
103 nickname = "Hash#Bang!" # a clever nickname containing a hash
104 fileutil.make_dirs(basedir)
105 f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
107 f.write("nickname = %s\n" % (nickname,))
109 n = TestNode(basedir)
110 self.failUnless(n.nickname == nickname)
112 def test_private_config(self):
113 basedir = "test_node/test_private_config"
114 privdir = os.path.join(basedir, "private")
115 fileutil.make_dirs(privdir)
116 f = open(os.path.join(privdir, 'already'), 'wt')
120 n = TestNode(basedir)
121 self.failUnlessEqual(n.get_private_config("already"), "secret")
122 self.failUnlessEqual(n.get_private_config("not", "default"), "default")
123 self.failUnlessRaises(MissingConfigEntry, n.get_private_config, "not")
124 value = n.get_or_create_private_config("new", "start")
125 self.failUnlessEqual(value, "start")
126 self.failUnlessEqual(n.get_private_config("new"), "start")
129 counter.append("called")
131 value = n.get_or_create_private_config("newer", make_newer)
132 self.failUnlessEqual(len(counter), 1)
133 self.failUnlessEqual(value, "newer")
134 self.failUnlessEqual(n.get_private_config("newer"), "newer")
136 value = n.get_or_create_private_config("newer", make_newer)
137 self.failUnlessEqual(len(counter), 1) # don't call unless necessary
138 self.failUnlessEqual(value, "newer")
140 def test_timestamp(self):
141 # this modified logger doesn't seem to get used during the tests,
142 # probably because we don't modify the LogObserver that trial
143 # installs (only the one that twistd installs). So manually exercise
145 t = formatTimeTahoeStyle("ignored", time.time())
146 self.failUnless("Z" in t)
147 t2 = formatTimeTahoeStyle("ignored", int(time.time()))
148 self.failUnless("Z" in t2)
150 def test_secrets_dir(self):
151 basedir = "test_node/test_secrets_dir"
152 fileutil.make_dirs(basedir)
153 n = TestNode(basedir)
154 self.failUnless(isinstance(n, TestNode))
155 self.failUnless(os.path.exists(os.path.join(basedir, "private")))
157 def test_secrets_dir_protected(self):
158 if "win32" in sys.platform.lower() or "cygwin" in sys.platform.lower():
159 # We don't know how to test that unprivileged users can't read this
160 # thing. (Also we don't know exactly how to set the permissions so
161 # that unprivileged users can't read this thing.)
162 raise unittest.SkipTest("We don't know how to set permissions on Windows.")
163 basedir = "test_node/test_secrets_dir_protected"
164 fileutil.make_dirs(basedir)
165 n = TestNode(basedir)
166 self.failUnless(isinstance(n, TestNode))
167 privdir = os.path.join(basedir, "private")
168 st = os.stat(privdir)
169 bits = stat.S_IMODE(st[stat.ST_MODE])
170 self.failUnless(bits & 0001 == 0, bits)
172 @patch("foolscap.logging.log.setLogDir")
173 def test_logdir_is_str(self, mock_setLogDir):
174 basedir = "test_node/test_logdir_is_str"
175 fileutil.make_dirs(basedir)
177 def call_setLogDir(logdir):
178 self.failUnless(isinstance(logdir, str), logdir)
179 mock_setLogDir.side_effect = call_setLogDir
182 self.failUnless(mock_setLogDir.called)