1 import datetime, os.path, re, types, ConfigParser, tempfile
2 from base64 import b32decode, b32encode
4 from twisted.python import log as twlog
5 from twisted.application import service
6 from twisted.internet import defer, reactor
7 from foolscap.api import Tub, eventually, app_versions
8 import foolscap.logging.log
9 from allmydata import get_package_versions, get_package_versions_string
10 from allmydata.util import log
11 from allmydata.util import fileutil, iputil, observer
12 from allmydata.util.assertutil import precondition, _assert
13 from allmydata.util.fileutil import abspath_expanduser_unicode
14 from allmydata.util.encodingutil import get_filesystem_encoding, quote_output
15 from allmydata.util import configutil
17 # Add our application versions to the data that Foolscap's LogPublisher
19 for thing, things_version in get_package_versions().iteritems():
20 app_versions.add_version(thing, str(things_version))
22 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
23 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
26 def formatTimeTahoeStyle(self, when):
27 # we want UTC timestamps that look like:
28 # 2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
29 d = datetime.datetime.utcfromtimestamp(when)
31 return d.isoformat(" ")[:-3]+"Z"
33 return d.isoformat(" ") + ".000Z"
36 This directory contains files which contain private data for the Tahoe node,
37 such as private keys. On Unix-like systems, the permissions on this directory
38 are set to disallow users other than its owner from reading the contents of
39 the files. See the 'configuration.rst' documentation file for details."""
41 class _None: # used as a marker in get_config()
44 class MissingConfigEntry(Exception):
45 """ A required config entry was not found. """
47 class OldConfigError(Exception):
48 """ An obsolete config file was found. See
49 docs/historical/configuration.rst. """
51 return ("Found pre-Tahoe-LAFS-v1.3 configuration file(s):\n"
53 "See docs/historical/configuration.rst."
54 % "\n".join([quote_output(fname) for fname in self.args[0]]))
56 class OldConfigOptionError(Exception):
59 class UnescapedHashError(Exception):
61 return ("The configuration entry %s contained an unescaped '#' character."
62 % quote_output("[%s]%s = %s" % self.args))
65 class Node(service.MultiService):
66 # this implements common functionality of both Client nodes and Introducer
68 NODETYPE = "unknown NODETYPE"
73 def __init__(self, basedir=u"."):
74 service.MultiService.__init__(self)
75 self.basedir = abspath_expanduser_unicode(unicode(basedir))
76 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
77 self._tub_ready_observerlist = observer.OneShotObserverList()
78 fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
79 open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
83 nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
84 self.nickname = nickname_utf8.decode("utf-8")
85 assert type(self.nickname) is unicode
93 self.log("Node constructed. " + get_package_versions_string())
94 iputil.increase_rlimits()
96 def init_tempdir(self):
97 tempdir_config = self.get_config("node", "tempdir", "tmp").decode('utf-8')
98 tempdir = abspath_expanduser_unicode(tempdir_config, base=self.basedir)
99 if not os.path.exists(tempdir):
100 fileutil.make_dirs(tempdir)
101 tempfile.tempdir = tempdir
102 # this should cause twisted.web.http (which uses
103 # tempfile.TemporaryFile) to put large request bodies in the given
104 # directory. Without this, the default temp dir is usually /tmp/,
105 # which is frequently too small.
106 test_name = tempfile.mktemp()
107 _assert(os.path.dirname(test_name) == tempdir, test_name, tempdir)
110 def _contains_unescaped_hash(item):
111 characters = iter(item)
120 def get_config(self, section, option, default=_None, boolean=False):
123 return self.config.getboolean(section, option)
125 item = self.config.get(section, option)
126 if option.endswith(".furl") and self._contains_unescaped_hash(item):
127 raise UnescapedHashError(section, option, item)
130 except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
132 fn = os.path.join(self.basedir, u"tahoe.cfg")
133 raise MissingConfigEntry("%s is missing the [%s]%s entry"
134 % (quote_output(fn), section, option))
137 def read_config(self):
138 self.error_about_old_config_files()
139 self.config = ConfigParser.SafeConfigParser()
141 tahoe_cfg = os.path.join(self.basedir, "tahoe.cfg")
143 self.config = configutil.get_config(tahoe_cfg)
144 except EnvironmentError:
145 if os.path.exists(tahoe_cfg):
148 cfg_tubport = self.get_config("node", "tub.port", "")
150 # For 'tub.port', tahoe.cfg overrides the individual file on
151 # disk. So only read self._portnumfile if tahoe.cfg doesn't
154 file_tubport = fileutil.read(self._portnumfile).strip()
155 configutil.set_config(self.config, "node", "tub.port", file_tubport)
156 except EnvironmentError:
157 if os.path.exists(self._portnumfile):
160 def error_about_old_config_files(self):
161 """ If any old configuration files are detected, raise OldConfigError. """
165 'nickname', 'webport', 'keepalive_timeout', 'log_gatherer.furl',
166 'disconnect_timeout', 'advertised_ip_addresses', 'introducer.furl',
167 'helper.furl', 'key_generator.furl', 'stats_gatherer.furl',
168 'no_storage', 'readonly_storage', 'sizelimit',
169 'debug_discard_storage', 'run_helper']:
170 if name not in self.GENERATED_FILES:
171 fullfname = os.path.join(self.basedir, name)
172 if os.path.exists(fullfname):
173 oldfnames.add(fullfname)
175 e = OldConfigError(oldfnames)
179 def create_tub(self):
180 certfile = os.path.join(self.basedir, "private", self.CERTFILE)
181 self.tub = Tub(certFile=certfile)
182 self.tub.setOption("logLocalFailures", True)
183 self.tub.setOption("logRemoteFailures", True)
184 self.tub.setOption("expose-remote-exception-types", False)
186 # see #521 for a discussion of how to pick these timeout values.
187 keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
188 if keepalive_timeout_s:
189 self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
190 disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
191 if disconnect_timeout_s:
192 # N.B.: this is in seconds, so use "1800" to get 30min
193 self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
195 self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
196 self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
197 self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
199 tubport = self.get_config("node", "tub.port", "tcp:0")
200 self.tub.listenOn(tubport)
201 # we must wait until our service has started before we can find out
202 # our IP address and thus do tub.setLocation, and we can't register
203 # any services with the Tub until after that point
204 self.tub.setServiceParent(self)
207 ssh_port = self.get_config("node", "ssh.port", "")
209 ssh_keyfile_config = self.get_config("node", "ssh.authorized_keys_file").decode('utf-8')
210 ssh_keyfile = abspath_expanduser_unicode(ssh_keyfile_config, base=self.basedir)
211 from allmydata import manhole
212 m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile)
213 m.setServiceParent(self)
214 self.log("AuthorizedKeysManhole listening on %s" % (ssh_port,))
216 def get_app_versions(self):
217 # TODO: merge this with allmydata.get_package_versions
218 return dict(app_versions.versions)
220 def get_config_from_file(self, name, required=False):
221 """Get the (string) contents of a config file, or None if the file
222 did not exist. If required=True, raise an exception rather than
223 returning None. Any leading or trailing whitespace will be stripped
225 fn = os.path.join(self.basedir, name)
227 return fileutil.read(fn).strip()
228 except EnvironmentError:
233 def write_private_config(self, name, value):
234 """Write the (string) contents of a private config file (which is a
235 config file that resides within the subdirectory named 'private'), and
238 privname = os.path.join(self.basedir, "private", name)
239 open(privname, "w").write(value)
241 def get_private_config(self, name, default=_None):
242 """Read the (string) contents of a private config file (which is a
243 config file that resides within the subdirectory named 'private'),
244 and return it. Return a default, or raise an error if one was not
247 privname = os.path.join(self.basedir, "private", name)
249 return fileutil.read(privname)
250 except EnvironmentError:
251 if os.path.exists(privname):
254 raise MissingConfigEntry("The required configuration file %s is missing."
255 % (quote_output(privname),))
258 def get_or_create_private_config(self, name, default=_None):
259 """Try to get the (string) contents of a private config file (which
260 is a config file that resides within the subdirectory named
261 'private'), and return it. Any leading or trailing whitespace will be
262 stripped from the data.
264 If the file does not exist, and default is not given, report an error.
265 If the file does not exist and a default is specified, try to create
266 it using that default, and then return the value that was written.
267 If 'default' is a string, use it as a default value. If not, treat it
268 as a zero-argument callable that is expected to return a string.
270 privname = os.path.join(self.basedir, "private", name)
272 value = fileutil.read(privname)
273 except EnvironmentError:
274 if os.path.exists(privname):
277 raise MissingConfigEntry("The required configuration file %s is missing."
278 % (quote_output(privname),))
279 if isinstance(default, basestring):
283 fileutil.write(privname, value)
286 def write_config(self, name, value, mode="w"):
287 """Write a string to a config file."""
288 fn = os.path.join(self.basedir, name)
290 fileutil.write(fn, value, mode)
291 except EnvironmentError, e:
292 self.log("Unable to write config file '%s'" % fn)
295 def startService(self):
296 # Note: this class can be started and stopped at most once.
297 self.log("Node.startService")
298 # Record the process id in the twisted log, after startService()
299 # (__init__ is called before fork(), but startService is called
300 # after). Note that Foolscap logs handle pid-logging by itself, no
301 # need to send a pid to the foolscap log here.
302 twlog.msg("My pid: %s" % os.getpid())
304 os.chmod("twistd.pid", 0644)
305 except EnvironmentError:
307 # Delay until the reactor is running.
308 eventually(self._startService)
310 def _startService(self):
311 precondition(reactor.running)
312 self.log("Node._startService")
314 service.MultiService.startService(self)
315 d = defer.succeed(None)
316 d.addCallback(self._setup_tub)
318 self.log("%s running" % self.NODETYPE)
319 self._tub_ready_observerlist.fire(self)
321 d.addCallback(_ready)
322 d.addErrback(self._service_startup_failed)
324 def _service_startup_failed(self, failure):
325 self.log('_startService() failed')
327 print "Node._startService failed, aborting"
329 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
330 self.log('calling os.abort()')
331 twlog.msg('calling os.abort()') # make sure it gets into twistd.log
332 print "calling os.abort()"
335 def stopService(self):
336 self.log("Node.stopService")
337 d = self._tub_ready_observerlist.when_fired()
338 def _really_stopService(ignored):
339 self.log("Node._really_stopService")
340 return service.MultiService.stopService(self)
341 d.addCallback(_really_stopService)
345 """Shut down the node. Returns a Deferred that fires (with None) when
346 it finally stops kicking."""
347 self.log("Node.shutdown")
348 return self.stopService()
350 def setup_logging(self):
351 # we replace the formatTime() method of the log observer that
352 # twistd set up for us, with a method that uses our preferred
354 for o in twlog.theLogPublisher.observers:
355 # o might be a FileLogObserver's .emit method
356 if type(o) is type(self.setup_logging): # bound method
358 if isinstance(ob, twlog.FileLogObserver):
359 newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
360 ob.formatTime = newmeth
361 # TODO: twisted >2.5.0 offers maxRotatedFiles=50
363 lgfurl_file = os.path.join(self.basedir, "private", "logport.furl").encode(get_filesystem_encoding())
364 self.tub.setOption("logport-furlfile", lgfurl_file)
365 lgfurl = self.get_config("node", "log_gatherer.furl", "")
367 # this is in addition to the contents of log-gatherer-furlfile
368 self.tub.setOption("log-gatherer-furl", lgfurl)
369 self.tub.setOption("log-gatherer-furlfile",
370 os.path.join(self.basedir, "log_gatherer.furl"))
372 incident_dir = os.path.join(self.basedir, "logs", "incidents")
373 foolscap.logging.log.setLogDir(incident_dir.encode(get_filesystem_encoding()))
375 def log(self, *args, **kwargs):
376 return log.msg(*args, **kwargs)
378 def _setup_tub(self, ign):
379 # we can't get a dynamically-assigned portnum until our Tub is
380 # running, which means after startService.
381 l = self.tub.getListeners()[0]
382 portnum = l.getPortnum()
383 # record which port we're listening on, so we can grab the same one
385 fileutil.write_atomically(self._portnumfile, "%d\n" % portnum, mode="")
387 location = self.get_config("node", "tub.location", "AUTO")
389 # Replace the location "AUTO", if present, with the detected local addresses.
390 split_location = location.split(",")
391 if "AUTO" in split_location:
392 d = iputil.get_local_addresses_async()
393 def _add_local(local_addresses):
394 while "AUTO" in split_location:
395 split_location.remove("AUTO")
397 split_location.extend([ "%s:%d" % (addr, portnum)
398 for addr in local_addresses ])
399 return ",".join(split_location)
400 d.addCallback(_add_local)
402 d = defer.succeed(location)
404 def _got_location(location):
405 self.log("Tub location set to %s" % (location,))
406 self.tub.setLocation(location)
408 d.addCallback(_got_location)
411 def when_tub_ready(self):
412 return self._tub_ready_observerlist.when_fired()
414 def add_service(self, s):
415 s.setServiceParent(self)