2 import datetime, os.path, re, types, ConfigParser
3 from base64 import b32decode, b32encode
5 from twisted.python import log as twlog
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 import foolscap.logging.log
10 from allmydata import get_package_versions, get_package_versions_string
11 from allmydata.util import log
12 from allmydata.util import fileutil, iputil, observer, humanreadable
13 from allmydata.util.assertutil import precondition
15 from foolscap.logging import app_versions
17 # Add our application versions to the data that Foolscap's LogPublisher
19 for thing, things_version in get_package_versions().iteritems():
20 app_versions.add_version(thing, str(things_version))
22 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
23 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
26 def formatTimeTahoeStyle(self, when):
27 # we want UTC timestamps that look like:
28 # 2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
29 d = datetime.datetime.utcfromtimestamp(when)
31 return d.isoformat(" ")[:-3]+"Z"
33 return d.isoformat(" ") + ".000Z"
36 This directory contains files which contain private data for the Tahoe node,
37 such as private keys. On Unix-like systems, the permissions on this directory
38 are set to disallow users other than its owner from reading the contents of
39 the files. See the 'configuration.txt' documentation file for details."""
41 class _None: # used as a marker in get_config()
44 class MissingConfigEntry(Exception):
47 class Node(service.MultiService):
48 # this implements common functionality of both Client nodes and Introducer
50 NODETYPE = "unknown NODETYPE"
53 LOCAL_IP_FILE = "advertised_ip_addresses"
55 def __init__(self, basedir="."):
56 service.MultiService.__init__(self)
57 self.basedir = os.path.abspath(basedir)
58 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
59 self._tub_ready_observerlist = observer.OneShotObserverList()
60 fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
61 open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
63 # creates self.config, populates from distinct files if necessary
66 nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
67 self.nickname = nickname_utf8.decode("utf-8")
74 self.log("Node constructed. " + get_package_versions_string())
75 iputil.increase_rlimits()
77 def get_config(self, section, option, default=_None, boolean=False):
80 return self.config.getboolean(section, option)
81 return self.config.get(section, option)
82 except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
84 fn = os.path.join(self.basedir, "tahoe.cfg")
85 raise MissingConfigEntry("%s is missing the [%s]%s entry"
86 % (fn, section, option))
89 def set_config(self, section, option, value):
90 if not self.config.has_section(section):
91 self.config.add_section(section)
92 self.config.set(section, option, value)
93 assert self.config.get(section, option) == value
95 def read_config(self):
96 self.config = ConfigParser.SafeConfigParser()
97 self.config.read([os.path.join(self.basedir, "tahoe.cfg")])
98 self.read_old_config_files()
100 def read_old_config_files(self):
101 # backwards-compatibility: individual files will override the
102 # contents of tahoe.cfg
103 copy = self._copy_config_from_file
105 copy("nickname", "node", "nickname")
106 copy("webport", "node", "web.port")
108 cfg_tubport = self.get_config("node", "tub.port", "")
110 # For 'tub.port', tahoe.cfg overrides the individual file on
111 # disk. So only read self._portnumfile is tahoe.cfg doesn't
114 file_tubport = open(self._portnumfile, "rU").read().strip()
115 self.set_config("node", "tub.port", file_tubport)
116 except EnvironmentError:
121 ipfile = os.path.join(self.basedir, self.LOCAL_IP_FILE)
122 tubport = int(self.get_config("node", "tub.port", "0"))
123 for addrline in open(ipfile, "rU"):
124 mo = ADDR_RE.search(addrline)
126 (addr, dummy, aportnum,) = mo.groups()
129 addresses.append("%s:%d" % (addr, int(aportnum),))
130 self.set_config("node", "advertised_ip_addresses",
132 except EnvironmentError:
134 copy("keepalive_timeout", "node", "timeout.keepalive")
135 copy("disconnect_timeout", "node", "timeout.disconnect")
137 def _copy_config_from_file(self, config_filename, section, keyname):
138 s = self.get_config_from_file(config_filename)
140 self.set_config(section, keyname, s)
142 def create_tub(self):
143 certfile = os.path.join(self.basedir, "private", self.CERTFILE)
144 self.tub = Tub(certFile=certfile)
145 self.tub.setOption("logLocalFailures", True)
146 self.tub.setOption("logRemoteFailures", True)
148 # see #521 for a discussion of how to pick these timeout values.
149 keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
150 if keepalive_timeout_s:
151 self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
152 disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
153 if disconnect_timeout_s:
154 # N.B.: this is in seconds, so use "1800" to get 30min
155 self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
157 self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
158 self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
159 self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
161 tubport = self.get_config("node", "tub.port", "tcp:0")
162 self.tub.listenOn(tubport)
163 # we must wait until our service has started before we can find out
164 # our IP address and thus do tub.setLocation, and we can't register
165 # any services with the Tub until after that point
166 self.tub.setServiceParent(self)
169 ssh_port = self.get_config("node", "ssh.port", "")
171 ssh_keyfile = self.get_config("node", "ssh.authorized_keys_file")
172 from allmydata import manhole
173 m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile)
174 m.setServiceParent(self)
175 self.log("AuthorizedKeysManhole listening on %s" % ssh_port)
177 def get_app_versions(self):
178 # TODO: merge this with allmydata.get_package_versions
179 return dict(app_versions.versions)
181 def get_config_from_file(self, name, required=False):
182 """Get the (string) contents of a config file, or None if the file
183 did not exist. If required=True, raise an exception rather than
184 returning None. Any leading or trailing whitespace will be stripped
186 fn = os.path.join(self.basedir, name)
188 return open(fn, "r").read().strip()
189 except EnvironmentError:
194 def write_private_config(self, name, value):
195 """Write the (string) contents of a private config file (which is a
196 config file that resides within the subdirectory named 'private'), and
197 return it. Any leading or trailing whitespace will be stripped from
200 privname = os.path.join(self.basedir, "private", name)
201 open(privname, "w").write(value.strip())
203 def get_or_create_private_config(self, name, default):
204 """Try to get the (string) contents of a private config file (which
205 is a config file that resides within the subdirectory named
206 'private'), and return it. Any leading or trailing whitespace will be
207 stripped from the data.
209 If the file does not exist, try to create it using default, and
210 then return the value that was written. If 'default' is a string,
211 use it as a default value. If not, treat it as a 0-argument callable
212 which is expected to return a string.
214 privname = os.path.join("private", name)
215 value = self.get_config_from_file(privname)
217 if isinstance(default, (str, unicode)):
221 fn = os.path.join(self.basedir, privname)
223 open(fn, "w").write(value)
224 except EnvironmentError, e:
225 self.log("Unable to write config file '%s'" % fn)
227 value = value.strip()
230 def write_config(self, name, value, mode="w"):
231 """Write a string to a config file."""
232 fn = os.path.join(self.basedir, name)
234 open(fn, mode).write(value)
235 except EnvironmentError, e:
236 self.log("Unable to write config file '%s'" % fn)
239 def startService(self):
240 # Note: this class can be started and stopped at most once.
241 self.log("Node.startService")
243 os.chmod("twistd.pid", 0644)
244 except EnvironmentError:
246 # Delay until the reactor is running.
247 eventual.eventually(self._startService)
249 def _startService(self):
250 precondition(reactor.running)
251 self.log("Node._startService")
253 service.MultiService.startService(self)
254 d = defer.succeed(None)
255 d.addCallback(lambda res: iputil.get_local_addresses_async())
256 d.addCallback(self._setup_tub)
258 self.log("%s running" % self.NODETYPE)
259 self._tub_ready_observerlist.fire(self)
261 d.addCallback(_ready)
262 d.addErrback(self._service_startup_failed)
264 def _service_startup_failed(self, failure):
265 self.log('_startService() failed')
267 print "Node._startService failed, aborting"
269 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
270 self.log('calling os.abort()')
271 twlog.msg('calling os.abort()') # make sure it gets into twistd.log
272 print "calling os.abort()"
275 def stopService(self):
276 self.log("Node.stopService")
277 d = self._tub_ready_observerlist.when_fired()
278 def _really_stopService(ignored):
279 self.log("Node._really_stopService")
280 return service.MultiService.stopService(self)
281 d.addCallback(_really_stopService)
285 """Shut down the node. Returns a Deferred that fires (with None) when
286 it finally stops kicking."""
287 self.log("Node.shutdown")
288 return self.stopService()
290 def setup_logging(self):
291 # we replace the formatTime() method of the log observer that twistd
292 # set up for us, with a method that uses better timestamps.
293 for o in twlog.theLogPublisher.observers:
294 # o might be a FileLogObserver's .emit method
295 if type(o) is type(self.setup_logging): # bound method
297 if isinstance(ob, twlog.FileLogObserver):
298 newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
299 ob.formatTime = newmeth
300 # TODO: twisted >2.5.0 offers maxRotatedFiles=50
302 self.tub.setOption("logport-furlfile",
303 os.path.join(self.basedir, "private","logport.furl"))
304 lgfurl = self.get_config("node", "log_gatherer.furl", "")
306 # this is in addition to the contents of log-gatherer-furlfile
307 self.tub.setOption("log-gatherer-furl", lgfurl)
308 self.tub.setOption("log-gatherer-furlfile",
309 os.path.join(self.basedir, "log_gatherer.furl"))
310 self.tub.setOption("bridge-twisted-logs", True)
311 incident_dir = os.path.join(self.basedir, "logs", "incidents")
312 # this doesn't quite work yet: unit tests fail
313 foolscap.logging.log.setLogDir(incident_dir)
315 def log(self, *args, **kwargs):
316 return log.msg(*args, **kwargs)
318 def old_log(self, msg, src="", args=(), **kw):
322 logsrc = self.logSource
325 msg = msg % tuple(map(humanreadable.hr, args))
327 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
328 msg = self.short_nodeid + ": " + humanreadable.hr(msg)
329 return twlog.callWithContext({"system":logsrc},
330 twlog.msg, msg, **kw)
332 def _setup_tub(self, local_addresses):
333 # we can't get a dynamically-assigned portnum until our Tub is
334 # running, which means after startService.
335 l = self.tub.getListeners()[0]
336 portnum = l.getPortnum()
337 # record which port we're listening on, so we can grab the same one next time
338 open(self._portnumfile, "w").write("%d\n" % portnum)
340 addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
341 extra_addresses = self.get_config("node", "advertised_ip_addresses", "")
343 extra_addresses = extra_addresses.split(",")
344 addresses.extend(extra_addresses)
346 location = ",".join(addresses)
347 self.log("Tub location set to %s" % location)
348 self.tub.setLocation(location)
351 def when_tub_ready(self):
352 return self._tub_ready_observerlist.when_fired()
354 def add_service(self, s):
355 s.setServiceParent(self)