2 import os, stat, sys, time
3 from twisted.trial import unittest
4 from twisted.internet import defer
5 from twisted.python import log
9 from foolscap.api import flushEventualQueue
10 from twisted.application import service
11 from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
12 from allmydata.util import fileutil
13 import allmydata.test.common_util as testutil
15 class LoggingMultiService(service.MultiService):
16 def log(self, msg, **kw):
20 CERTFILE='DEFAULT_CERTFILE_BLANK'
21 PORTNUMFILE='DEFAULT_PORTNUMFILE_BLANK'
23 class TestCase(testutil.SignalMixin, unittest.TestCase):
25 testutil.SignalMixin.setUp(self)
26 self.parent = LoggingMultiService()
27 self.parent.startService()
29 log.msg("%s.tearDown" % self.__class__.__name__)
30 testutil.SignalMixin.tearDown(self)
31 d = defer.succeed(None)
32 d.addCallback(lambda res: self.parent.stopService())
33 d.addCallback(flushEventualQueue)
36 def _test_location(self, basedir, expected_addresses, tub_port=None, tub_location=None):
37 fileutil.make_dirs(basedir)
38 f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
41 f.write("tub.port = %d\n" % (tub_port,))
42 if tub_location is not None:
43 f.write("tub.location = %s\n" % (tub_location,))
47 n.setServiceParent(self.parent)
48 d = n.when_tub_ready()
50 def _check_addresses(ignored_result):
51 furl = n.tub.registerReference(n)
52 for address in expected_addresses:
53 self.failUnlessIn(address, furl)
55 d.addCallback(_check_addresses)
58 def test_location1(self):
59 return self._test_location(basedir="test_node/test_location1",
60 expected_addresses=["192.0.2.0:1234"],
61 tub_location="192.0.2.0:1234")
63 def test_location2(self):
64 return self._test_location(basedir="test_node/test_location2",
65 expected_addresses=["192.0.2.0:1234", "example.org:8091"],
66 tub_location="192.0.2.0:1234,example.org:8091")
69 def test_tahoe_cfg_utf8(self):
70 basedir = "test_node/test_tahoe_cfg_utf8"
71 fileutil.make_dirs(basedir)
72 f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
73 f.write(u"\uFEFF[node]\n".encode('utf-8'))
74 f.write(u"nickname = \u2621\n".encode('utf-8'))
78 n.setServiceParent(self.parent)
79 d = n.when_tub_ready()
80 d.addCallback(lambda ign: self.failUnlessEqual(n.get_config("node", "nickname").decode('utf-8'),
84 def test_tahoe_cfg_hash_in_name(self):
85 basedir = "test_node/test_cfg_hash_in_name"
86 nickname = "Hash#Bang!" # a clever nickname containing a hash
87 fileutil.make_dirs(basedir)
88 f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
90 f.write("nickname = %s\n" % (nickname,))
93 self.failUnless(n.nickname == nickname)
95 def test_private_config(self):
96 basedir = "test_node/test_private_config"
97 privdir = os.path.join(basedir, "private")
98 fileutil.make_dirs(privdir)
99 f = open(os.path.join(privdir, 'already'), 'wt')
103 n = TestNode(basedir)
104 self.failUnlessEqual(n.get_private_config("already"), "secret")
105 self.failUnlessEqual(n.get_private_config("not", "default"), "default")
106 self.failUnlessRaises(MissingConfigEntry, n.get_private_config, "not")
107 value = n.get_or_create_private_config("new", "start")
108 self.failUnlessEqual(value, "start")
109 self.failUnlessEqual(n.get_private_config("new"), "start")
112 counter.append("called")
114 value = n.get_or_create_private_config("newer", make_newer)
115 self.failUnlessEqual(len(counter), 1)
116 self.failUnlessEqual(value, "newer")
117 self.failUnlessEqual(n.get_private_config("newer"), "newer")
119 value = n.get_or_create_private_config("newer", make_newer)
120 self.failUnlessEqual(len(counter), 1) # don't call unless necessary
121 self.failUnlessEqual(value, "newer")
123 def test_timestamp(self):
124 # this modified logger doesn't seem to get used during the tests,
125 # probably because we don't modify the LogObserver that trial
126 # installs (only the one that twistd installs). So manually exercise
128 t = formatTimeTahoeStyle("ignored", time.time())
129 self.failUnless("Z" in t)
130 t2 = formatTimeTahoeStyle("ignored", int(time.time()))
131 self.failUnless("Z" in t2)
133 def test_secrets_dir(self):
134 basedir = "test_node/test_secrets_dir"
135 fileutil.make_dirs(basedir)
136 n = TestNode(basedir)
137 self.failUnless(isinstance(n, TestNode))
138 self.failUnless(os.path.exists(os.path.join(basedir, "private")))
140 def test_secrets_dir_protected(self):
141 if "win32" in sys.platform.lower() or "cygwin" in sys.platform.lower():
142 # We don't know how to test that unprivileged users can't read this
143 # thing. (Also we don't know exactly how to set the permissions so
144 # that unprivileged users can't read this thing.)
145 raise unittest.SkipTest("We don't know how to set permissions on Windows.")
146 basedir = "test_node/test_secrets_dir_protected"
147 fileutil.make_dirs(basedir)
148 n = TestNode(basedir)
149 self.failUnless(isinstance(n, TestNode))
150 privdir = os.path.join(basedir, "private")
151 st = os.stat(privdir)
152 bits = stat.S_IMODE(st[stat.ST_MODE])
153 self.failUnless(bits & 0001 == 0, bits)
155 @patch("foolscap.logging.log.setLogDir")
156 def test_logdir_is_str(self, mock_setLogDir):
157 basedir = "test_node/test_logdir_is_str"
158 fileutil.make_dirs(basedir)
160 def call_setLogDir(logdir):
161 self.failUnless(isinstance(logdir, str), logdir)
162 mock_setLogDir.side_effect = call_setLogDir
165 self.failUnless(mock_setLogDir.called)