2 import os, stat, sys, time
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.python import log
8 from foolscap.api import flushEventualQueue
9 import foolscap.logging.log
11 from twisted.application import service
12 from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
13 from allmydata.util import fileutil, iputil
14 from allmydata.util.namespace import Namespace
15 import allmydata.test.common_util as testutil
18 class LoggingMultiService(service.MultiService):
19 def log(self, msg, **kw):
23 CERTFILE='DEFAULT_CERTFILE_BLANK'
24 PORTNUMFILE='DEFAULT_PORTNUMFILE_BLANK'
26 class TestCase(testutil.SignalMixin, unittest.TestCase):
28 testutil.SignalMixin.setUp(self)
29 self.parent = LoggingMultiService()
30 self.parent.startService()
32 log.msg("%s.tearDown" % self.__class__.__name__)
33 testutil.SignalMixin.tearDown(self)
34 d = defer.succeed(None)
35 d.addCallback(lambda res: self.parent.stopService())
36 d.addCallback(flushEventualQueue)
39 def _test_location(self, basedir, expected_addresses, tub_port=None, tub_location=None, local_addresses=None):
40 fileutil.make_dirs(basedir)
41 f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
44 f.write("tub.port = %d\n" % (tub_port,))
45 if tub_location is not None:
46 f.write("tub.location = %s\n" % (tub_location,))
50 self.patch(iputil, 'get_local_addresses_async', lambda target=None: defer.succeed(local_addresses))
53 n.setServiceParent(self.parent)
54 d = n.when_tub_ready()
56 def _check_addresses(ignored_result):
57 furl = n.tub.registerReference(n)
58 for address in expected_addresses:
59 self.failUnlessIn(address, furl)
61 d.addCallback(_check_addresses)
64 def test_location1(self):
65 return self._test_location(basedir="test_node/test_location1",
66 expected_addresses=["192.0.2.0:1234"],
67 tub_location="192.0.2.0:1234")
69 def test_location2(self):
70 return self._test_location(basedir="test_node/test_location2",
71 expected_addresses=["192.0.2.0:1234", "example.org:8091"],
72 tub_location="192.0.2.0:1234,example.org:8091")
74 def test_location_not_set(self):
75 """Checks the autogenerated furl when tub.location is not set."""
76 return self._test_location(basedir="test_node/test_location3",
77 expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234"],
79 local_addresses=["127.0.0.1", "192.0.2.0"])
81 def test_location_auto_and_explicit(self):
82 """Checks the autogenerated furl when tub.location contains 'AUTO'."""
83 return self._test_location(basedir="test_node/test_location4",
84 expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234", "example.com:4321"],
86 tub_location="AUTO,example.com:4321",
87 local_addresses=["127.0.0.1", "192.0.2.0", "example.com:4321"])
89 def test_tahoe_cfg_utf8(self):
90 basedir = "test_node/test_tahoe_cfg_utf8"
91 fileutil.make_dirs(basedir)
92 f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
93 f.write(u"\uFEFF[node]\n".encode('utf-8'))
94 f.write(u"nickname = \u2621\n".encode('utf-8'))
98 n.setServiceParent(self.parent)
99 d = n.when_tub_ready()
100 d.addCallback(lambda ign: self.failUnlessEqual(n.get_config("node", "nickname").decode('utf-8'),
104 def test_tahoe_cfg_hash_in_name(self):
105 basedir = "test_node/test_cfg_hash_in_name"
106 nickname = "Hash#Bang!" # a clever nickname containing a hash
107 fileutil.make_dirs(basedir)
108 f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
110 f.write("nickname = %s\n" % (nickname,))
112 n = TestNode(basedir)
113 self.failUnless(n.nickname == nickname)
115 def test_private_config(self):
116 basedir = "test_node/test_private_config"
117 privdir = os.path.join(basedir, "private")
118 fileutil.make_dirs(privdir)
119 f = open(os.path.join(privdir, 'already'), 'wt')
123 n = TestNode(basedir)
124 self.failUnlessEqual(n.get_private_config("already"), "secret")
125 self.failUnlessEqual(n.get_private_config("not", "default"), "default")
126 self.failUnlessRaises(MissingConfigEntry, n.get_private_config, "not")
127 value = n.get_or_create_private_config("new", "start")
128 self.failUnlessEqual(value, "start")
129 self.failUnlessEqual(n.get_private_config("new"), "start")
132 counter.append("called")
134 value = n.get_or_create_private_config("newer", make_newer)
135 self.failUnlessEqual(len(counter), 1)
136 self.failUnlessEqual(value, "newer")
137 self.failUnlessEqual(n.get_private_config("newer"), "newer")
139 value = n.get_or_create_private_config("newer", make_newer)
140 self.failUnlessEqual(len(counter), 1) # don't call unless necessary
141 self.failUnlessEqual(value, "newer")
143 def test_timestamp(self):
144 # this modified logger doesn't seem to get used during the tests,
145 # probably because we don't modify the LogObserver that trial
146 # installs (only the one that twistd installs). So manually exercise
148 t = formatTimeTahoeStyle("ignored", time.time())
149 self.failUnless("Z" in t)
150 t2 = formatTimeTahoeStyle("ignored", int(time.time()))
151 self.failUnless("Z" in t2)
153 def test_secrets_dir(self):
154 basedir = "test_node/test_secrets_dir"
155 fileutil.make_dirs(basedir)
156 n = TestNode(basedir)
157 self.failUnless(isinstance(n, TestNode))
158 self.failUnless(os.path.exists(os.path.join(basedir, "private")))
160 def test_secrets_dir_protected(self):
161 if "win32" in sys.platform.lower() or "cygwin" in sys.platform.lower():
162 # We don't know how to test that unprivileged users can't read this
163 # thing. (Also we don't know exactly how to set the permissions so
164 # that unprivileged users can't read this thing.)
165 raise unittest.SkipTest("We don't know how to set permissions on Windows.")
166 basedir = "test_node/test_secrets_dir_protected"
167 fileutil.make_dirs(basedir)
168 n = TestNode(basedir)
169 self.failUnless(isinstance(n, TestNode))
170 privdir = os.path.join(basedir, "private")
171 st = os.stat(privdir)
172 bits = stat.S_IMODE(st[stat.ST_MODE])
173 self.failUnless(bits & 0001 == 0, bits)
175 def test_logdir_is_str(self):
176 basedir = "test_node/test_logdir_is_str"
177 fileutil.make_dirs(basedir)
181 def call_setLogDir(logdir):
183 self.failUnless(isinstance(logdir, str), logdir)
184 self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir)
187 self.failUnless(ns.called)