2 import os, stat, sys, time
3 from twisted.trial import unittest
4 from twisted.internet import defer
5 from twisted.python import log
7 from foolscap.eventual import flushEventualQueue
8 from twisted.application import service
9 from allmydata.node import Node, formatTimeTahoeStyle
10 from allmydata.util import fileutil
11 import common_util as testutil
13 class LoggingMultiService(service.MultiService):
14 def log(self, msg, **kw):
18 CERTFILE='DEFAULT_CERTFILE_BLANK'
19 PORTNUMFILE='DEFAULT_PORTNUMFILE_BLANK'
21 class TestCase(unittest.TestCase, testutil.SignalMixin):
23 self.parent = LoggingMultiService()
24 self.parent.startService()
26 log.msg("%s.tearDown" % self.__class__.__name__)
27 d = defer.succeed(None)
28 d.addCallback(lambda res: self.parent.stopService())
29 d.addCallback(flushEventualQueue)
32 def test_advertised_ip_addresses(self):
33 basedir = "test_node/test_advertised_ip_addresses"
34 fileutil.make_dirs(basedir)
35 f = open(os.path.join(basedir, 'advertised_ip_addresses'),'w')
40 n.setServiceParent(self.parent)
41 d = n.when_tub_ready()
43 def _check_addresses(ignored_result):
44 furl = n.tub.registerReference(n)
45 self.failUnless("1.2.3.4:5" in furl, furl)
47 d.addCallback(_check_addresses)
50 def test_advertised_ip_addresses2(self):
51 basedir = "test_node/test_advertised_ip_addresses2"
52 fileutil.make_dirs(basedir)
55 n.setServiceParent(self.parent)
56 d = n.when_tub_ready()
57 # this lets the 'port' file get written
58 d.addCallback(lambda res: n.disownServiceParent())
60 f = open(os.path.join(basedir, 'advertised_ip_addresses'),'w')
64 n2 = self.node = TestNode(basedir)
65 n2.setServiceParent(self.parent)
66 return n2.when_tub_ready()
67 d.addCallback(_new_node)
69 def _check_addresses(ignored_result):
70 portfile = os.path.join(basedir, self.node.PORTNUMFILE)
71 port = int(open(portfile, "r").read().strip())
72 furl = self.node.tub.registerReference(n)
73 self.failUnless(("1.2.3.4:%d" % port) in furl, furl)
74 self.failUnless(("6.7.8.9:%d" % port) in furl, furl)
76 d.addCallback(_check_addresses)
79 def test_timestamp(self):
80 # this modified logger doesn't seem to get used during the tests,
81 # probably because we don't modify the LogObserver that trial
82 # installs (only the one that twistd installs). So manually exercise
84 t = formatTimeTahoeStyle("ignored", time.time())
85 self.failUnless("Z" in t)
86 t2 = formatTimeTahoeStyle("ignored", int(time.time()))
87 self.failUnless("Z" in t2)
89 def test_secrets_dir(self):
90 basedir = "test_node/test_secrets_dir"
91 fileutil.make_dirs(basedir)
93 self.failUnless(os.path.exists(os.path.join(basedir, "private")))
95 def test_secrets_dir_protected(self):
96 if "win32" in sys.platform.lower() or "cygwin" in sys.platform.lower():
97 # We don't know how to test that unprivileged users can't read this
98 # thing. (Also we don't know exactly how to set the permissions so
99 # that unprivileged users can't read this thing.)
100 raise unittest.SkipTest("We don't know how to set permissions on Windows.")
101 basedir = "test_node/test_secrets_dir_protected"
102 fileutil.make_dirs(basedir)
103 n = TestNode(basedir)
104 privdir = os.path.join(basedir, "private")
105 st = os.stat(privdir)
106 bits = stat.S_IMODE(st[stat.ST_MODE])
107 self.failUnless(bits & 0001 == 0, bits)