1 import datetime, os.path, re, types, ConfigParser, tempfile
2 from base64 import b32decode, b32encode
4 from twisted.python import log as twlog
5 from twisted.application import service
6 from twisted.internet import defer, reactor
7 from foolscap.api import Tub, eventually, app_versions
8 import foolscap.logging.log
9 from allmydata import get_package_versions, get_package_versions_string
10 from allmydata.util import log
11 from allmydata.util import fileutil, iputil, observer
12 from allmydata.util.assertutil import precondition, _assert
13 from allmydata.util.fileutil import abspath_expanduser_unicode
14 from allmydata.util.encodingutil import get_filesystem_encoding, quote_output
16 # Add our application versions to the data that Foolscap's LogPublisher
18 for thing, things_version in get_package_versions().iteritems():
19 app_versions.add_version(thing, str(things_version))
21 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
22 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
25 def formatTimeTahoeStyle(self, when):
26 # we want UTC timestamps that look like:
27 # 2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
28 d = datetime.datetime.utcfromtimestamp(when)
30 return d.isoformat(" ")[:-3]+"Z"
32 return d.isoformat(" ") + ".000Z"
35 This directory contains files which contain private data for the Tahoe node,
36 such as private keys. On Unix-like systems, the permissions on this directory
37 are set to disallow users other than its owner from reading the contents of
38 the files. See the 'configuration.rst' documentation file for details."""
40 class _None: # used as a marker in get_config()
43 class MissingConfigEntry(Exception):
44 """ A required config entry was not found. """
46 class OldConfigError(Exception):
47 """ An obsolete config file was found. See
48 docs/historical/configuration.rst. """
50 return ("Found pre-Tahoe-LAFS-v1.3 configuration file(s):\n"
52 "See docs/historical/configuration.rst."
53 % "\n".join([quote_output(fname) for fname in self.args[0]]))
55 class OldConfigOptionError(Exception):
58 class UnescapedHashError(Exception):
60 return ("The configuration entry %s contained an unescaped '#' character."
61 % quote_output("[%s]%s = %s" % self.args))
64 class Node(service.MultiService):
65 # this implements common functionality of both Client nodes and Introducer
67 NODETYPE = "unknown NODETYPE"
72 def __init__(self, basedir=u"."):
73 service.MultiService.__init__(self)
74 self.basedir = abspath_expanduser_unicode(unicode(basedir))
75 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
76 self._tub_ready_observerlist = observer.OneShotObserverList()
77 fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
78 open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
82 nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
83 self.nickname = nickname_utf8.decode("utf-8")
84 assert type(self.nickname) is unicode
92 self.log("Node constructed. " + get_package_versions_string())
93 iputil.increase_rlimits()
95 def init_tempdir(self):
96 tempdir_config = self.get_config("node", "tempdir", "tmp").decode('utf-8')
97 tempdir = abspath_expanduser_unicode(tempdir_config, base=self.basedir)
98 if not os.path.exists(tempdir):
99 fileutil.make_dirs(tempdir)
100 tempfile.tempdir = tempdir
101 # this should cause twisted.web.http (which uses
102 # tempfile.TemporaryFile) to put large request bodies in the given
103 # directory. Without this, the default temp dir is usually /tmp/,
104 # which is frequently too small.
105 test_name = tempfile.mktemp()
106 _assert(os.path.dirname(test_name) == tempdir, test_name, tempdir)
109 def _contains_unescaped_hash(item):
110 characters = iter(item)
119 def get_config(self, section, option, default=_None, boolean=False):
122 return self.config.getboolean(section, option)
124 item = self.config.get(section, option)
125 if option.endswith(".furl") and self._contains_unescaped_hash(item):
126 raise UnescapedHashError(section, option, item)
129 except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
131 fn = os.path.join(self.basedir, u"tahoe.cfg")
132 raise MissingConfigEntry("%s is missing the [%s]%s entry"
133 % (quote_output(fn), section, option))
136 def set_config(self, section, option, value):
137 if not self.config.has_section(section):
138 self.config.add_section(section)
139 self.config.set(section, option, value)
140 assert self.config.get(section, option) == value
142 def read_config(self):
143 self.error_about_old_config_files()
144 self.config = ConfigParser.SafeConfigParser()
146 tahoe_cfg = os.path.join(self.basedir, "tahoe.cfg")
148 f = open(tahoe_cfg, "rb")
150 # Skip any initial Byte Order Mark. Since this is an ordinary file, we
151 # don't need to handle incomplete reads, and can assume seekability.
152 if f.read(3) != '\xEF\xBB\xBF':
154 self.config.readfp(f)
157 except EnvironmentError:
158 if os.path.exists(tahoe_cfg):
161 cfg_tubport = self.get_config("node", "tub.port", "")
163 # For 'tub.port', tahoe.cfg overrides the individual file on
164 # disk. So only read self._portnumfile if tahoe.cfg doesn't
167 file_tubport = fileutil.read(self._portnumfile).strip()
168 self.set_config("node", "tub.port", file_tubport)
169 except EnvironmentError:
170 if os.path.exists(self._portnumfile):
173 def error_about_old_config_files(self):
174 """ If any old configuration files are detected, raise OldConfigError. """
178 'nickname', 'webport', 'keepalive_timeout', 'log_gatherer.furl',
179 'disconnect_timeout', 'advertised_ip_addresses', 'introducer.furl',
180 'helper.furl', 'key_generator.furl', 'stats_gatherer.furl',
181 'no_storage', 'readonly_storage', 'sizelimit',
182 'debug_discard_storage', 'run_helper']:
183 if name not in self.GENERATED_FILES:
184 fullfname = os.path.join(self.basedir, name)
185 if os.path.exists(fullfname):
186 oldfnames.add(fullfname)
188 e = OldConfigError(oldfnames)
192 def create_tub(self):
193 certfile = os.path.join(self.basedir, "private", self.CERTFILE)
194 self.tub = Tub(certFile=certfile)
195 self.tub.setOption("logLocalFailures", True)
196 self.tub.setOption("logRemoteFailures", True)
197 self.tub.setOption("expose-remote-exception-types", False)
199 # see #521 for a discussion of how to pick these timeout values.
200 keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
201 if keepalive_timeout_s:
202 self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
203 disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
204 if disconnect_timeout_s:
205 # N.B.: this is in seconds, so use "1800" to get 30min
206 self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
208 self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
209 self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
210 self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
212 tubport = self.get_config("node", "tub.port", "tcp:0")
213 self.tub.listenOn(tubport)
214 # we must wait until our service has started before we can find out
215 # our IP address and thus do tub.setLocation, and we can't register
216 # any services with the Tub until after that point
217 self.tub.setServiceParent(self)
220 ssh_port = self.get_config("node", "ssh.port", "")
222 ssh_keyfile_config = self.get_config("node", "ssh.authorized_keys_file").decode('utf-8')
223 ssh_keyfile = abspath_expanduser_unicode(ssh_keyfile_config, base=self.basedir)
224 from allmydata import manhole
225 m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile)
226 m.setServiceParent(self)
227 self.log("AuthorizedKeysManhole listening on %s" % (ssh_port,))
229 def get_app_versions(self):
230 # TODO: merge this with allmydata.get_package_versions
231 return dict(app_versions.versions)
233 def get_config_from_file(self, name, required=False):
234 """Get the (string) contents of a config file, or None if the file
235 did not exist. If required=True, raise an exception rather than
236 returning None. Any leading or trailing whitespace will be stripped
238 fn = os.path.join(self.basedir, name)
240 return fileutil.read(fn).strip()
241 except EnvironmentError:
246 def write_private_config(self, name, value):
247 """Write the (string) contents of a private config file (which is a
248 config file that resides within the subdirectory named 'private'), and
251 privname = os.path.join(self.basedir, "private", name)
252 open(privname, "w").write(value)
254 def get_private_config(self, name, default=_None):
255 """Read the (string) contents of a private config file (which is a
256 config file that resides within the subdirectory named 'private'),
257 and return it. Return a default, or raise an error if one was not
260 privname = os.path.join(self.basedir, "private", name)
262 return fileutil.read(privname)
263 except EnvironmentError:
264 if os.path.exists(privname):
267 raise MissingConfigEntry("The required configuration file %s is missing."
268 % (quote_output(privname),))
271 def get_or_create_private_config(self, name, default=_None):
272 """Try to get the (string) contents of a private config file (which
273 is a config file that resides within the subdirectory named
274 'private'), and return it. Any leading or trailing whitespace will be
275 stripped from the data.
277 If the file does not exist, and default is not given, report an error.
278 If the file does not exist and a default is specified, try to create
279 it using that default, and then return the value that was written.
280 If 'default' is a string, use it as a default value. If not, treat it
281 as a zero-argument callable that is expected to return a string.
283 privname = os.path.join(self.basedir, "private", name)
285 value = fileutil.read(privname)
286 except EnvironmentError:
287 if os.path.exists(privname):
290 raise MissingConfigEntry("The required configuration file %s is missing."
291 % (quote_output(privname),))
292 if isinstance(default, basestring):
296 fileutil.write(privname, value)
299 def write_config(self, name, value, mode="w"):
300 """Write a string to a config file."""
301 fn = os.path.join(self.basedir, name)
303 fileutil.write(fn, value, mode)
304 except EnvironmentError, e:
305 self.log("Unable to write config file '%s'" % fn)
308 def startService(self):
309 # Note: this class can be started and stopped at most once.
310 self.log("Node.startService")
311 # Record the process id in the twisted log, after startService()
312 # (__init__ is called before fork(), but startService is called
313 # after). Note that Foolscap logs handle pid-logging by itself, no
314 # need to send a pid to the foolscap log here.
315 twlog.msg("My pid: %s" % os.getpid())
317 os.chmod("twistd.pid", 0644)
318 except EnvironmentError:
320 # Delay until the reactor is running.
321 eventually(self._startService)
323 def _startService(self):
324 precondition(reactor.running)
325 self.log("Node._startService")
327 service.MultiService.startService(self)
328 d = defer.succeed(None)
329 d.addCallback(self._setup_tub)
331 self.log("%s running" % self.NODETYPE)
332 self._tub_ready_observerlist.fire(self)
334 d.addCallback(_ready)
335 d.addErrback(self._service_startup_failed)
337 def _service_startup_failed(self, failure):
338 self.log('_startService() failed')
340 print "Node._startService failed, aborting"
342 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
343 self.log('calling os.abort()')
344 twlog.msg('calling os.abort()') # make sure it gets into twistd.log
345 print "calling os.abort()"
348 def stopService(self):
349 self.log("Node.stopService")
350 d = self._tub_ready_observerlist.when_fired()
351 def _really_stopService(ignored):
352 self.log("Node._really_stopService")
353 return service.MultiService.stopService(self)
354 d.addCallback(_really_stopService)
358 """Shut down the node. Returns a Deferred that fires (with None) when
359 it finally stops kicking."""
360 self.log("Node.shutdown")
361 return self.stopService()
363 def setup_logging(self):
364 # we replace the formatTime() method of the log observer that
365 # twistd set up for us, with a method that uses our preferred
367 for o in twlog.theLogPublisher.observers:
368 # o might be a FileLogObserver's .emit method
369 if type(o) is type(self.setup_logging): # bound method
371 if isinstance(ob, twlog.FileLogObserver):
372 newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
373 ob.formatTime = newmeth
374 # TODO: twisted >2.5.0 offers maxRotatedFiles=50
376 lgfurl_file = os.path.join(self.basedir, "private", "logport.furl").encode(get_filesystem_encoding())
377 self.tub.setOption("logport-furlfile", lgfurl_file)
378 lgfurl = self.get_config("node", "log_gatherer.furl", "")
380 # this is in addition to the contents of log-gatherer-furlfile
381 self.tub.setOption("log-gatherer-furl", lgfurl)
382 self.tub.setOption("log-gatherer-furlfile",
383 os.path.join(self.basedir, "log_gatherer.furl"))
385 incident_dir = os.path.join(self.basedir, "logs", "incidents")
386 foolscap.logging.log.setLogDir(incident_dir.encode(get_filesystem_encoding()))
388 def log(self, *args, **kwargs):
389 return log.msg(*args, **kwargs)
391 def _setup_tub(self, ign):
392 # we can't get a dynamically-assigned portnum until our Tub is
393 # running, which means after startService.
394 l = self.tub.getListeners()[0]
395 portnum = l.getPortnum()
396 # record which port we're listening on, so we can grab the same one
398 fileutil.write_atomically(self._portnumfile, "%d\n" % portnum, mode="")
400 location = self.get_config("node", "tub.location", "AUTO")
402 # Replace the location "AUTO", if present, with the detected local addresses.
403 split_location = location.split(",")
404 if "AUTO" in split_location:
405 d = iputil.get_local_addresses_async()
406 def _add_local(local_addresses):
407 while "AUTO" in split_location:
408 split_location.remove("AUTO")
410 split_location.extend([ "%s:%d" % (addr, portnum)
411 for addr in local_addresses ])
412 return ",".join(split_location)
413 d.addCallback(_add_local)
415 d = defer.succeed(location)
417 def _got_location(location):
418 self.log("Tub location set to %s" % (location,))
419 self.tub.setLocation(location)
421 d.addCallback(_got_location)
424 def when_tub_ready(self):
425 return self._tub_ready_observerlist.when_fired()
427 def add_service(self, s):
428 s.setServiceParent(self)