1 import datetime, os.path, re, types, ConfigParser, tempfile
2 from base64 import b32decode, b32encode
4 from twisted.python import log as twlog
5 from twisted.application import service
6 from twisted.internet import defer, reactor
7 from foolscap.api import Tub, eventually, app_versions
8 import foolscap.logging.log
9 from allmydata import get_package_versions, get_package_versions_string
10 from allmydata.util import log
11 from allmydata.util import fileutil, iputil, observer
12 from allmydata.util.assertutil import precondition, _assert
13 from allmydata.util.fileutil import abspath_expanduser_unicode
14 from allmydata.util.encodingutil import get_filesystem_encoding, quote_output
16 # Add our application versions to the data that Foolscap's LogPublisher
18 for thing, things_version in get_package_versions().iteritems():
19 app_versions.add_version(thing, str(things_version))
21 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
22 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
25 def formatTimeTahoeStyle(self, when):
26 # we want UTC timestamps that look like:
27 # 2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
28 d = datetime.datetime.utcfromtimestamp(when)
30 return d.isoformat(" ")[:-3]+"Z"
32 return d.isoformat(" ") + ".000Z"
35 This directory contains files which contain private data for the Tahoe node,
36 such as private keys. On Unix-like systems, the permissions on this directory
37 are set to disallow users other than its owner from reading the contents of
38 the files. See the 'configuration.rst' documentation file for details."""
40 class _None: # used as a marker in get_config()
43 class MissingConfigEntry(Exception):
44 """ A required config entry was not found. """
46 class OldConfigError(Exception):
47 """ An obsolete config file was found. See
48 docs/historical/configuration.rst. """
50 return ("Found pre-Tahoe-LAFS-v1.3 configuration file(s):\n"
52 "See docs/historical/configuration.rst."
53 % "\n".join([quote_output(fname) for fname in self.args[0]]))
55 class OldConfigOptionError(Exception):
58 class UnescapedHashError(Exception):
60 return ("The configuration entry %s contained an unescaped '#' character."
61 % quote_output("[%s]%s = %s" % self.args))
64 class Node(service.MultiService):
65 # this implements common functionality of both Client nodes and Introducer
67 NODETYPE = "unknown NODETYPE"
72 def __init__(self, basedir=u"."):
73 service.MultiService.__init__(self)
74 self.basedir = abspath_expanduser_unicode(unicode(basedir))
75 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
76 self._tub_ready_observerlist = observer.OneShotObserverList()
77 fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
78 open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
82 nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
83 self.nickname = nickname_utf8.decode("utf-8")
84 assert type(self.nickname) is unicode
92 self.log("Node constructed. " + get_package_versions_string())
93 iputil.increase_rlimits()
95 def init_tempdir(self):
96 tempdir_config = self.get_config("node", "tempdir", "tmp").decode('utf-8')
97 tempdir = abspath_expanduser_unicode(tempdir_config, base=self.basedir)
98 if not os.path.exists(tempdir):
99 fileutil.make_dirs(tempdir)
100 tempfile.tempdir = tempdir
101 # this should cause twisted.web.http (which uses
102 # tempfile.TemporaryFile) to put large request bodies in the given
103 # directory. Without this, the default temp dir is usually /tmp/,
104 # which is frequently too small.
105 test_name = tempfile.mktemp()
106 _assert(os.path.dirname(test_name) == tempdir, test_name, tempdir)
109 def _contains_unescaped_hash(item):
110 characters = iter(item)
119 def get_config(self, section, option, default=_None, boolean=False):
122 return self.config.getboolean(section, option)
124 item = self.config.get(section, option)
125 if option.endswith(".furl") and self._contains_unescaped_hash(item):
126 raise UnescapedHashError(section, option, item)
129 except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
131 fn = os.path.join(self.basedir, u"tahoe.cfg")
132 raise MissingConfigEntry("%s is missing the [%s]%s entry"
133 % (quote_output(fn), section, option))
136 def set_config(self, section, option, value):
137 if not self.config.has_section(section):
138 self.config.add_section(section)
139 self.config.set(section, option, value)
140 assert self.config.get(section, option) == value
142 def read_config(self):
143 self.error_about_old_config_files()
144 self.config = ConfigParser.SafeConfigParser()
146 tahoe_cfg = os.path.join(self.basedir, "tahoe.cfg")
148 f = open(tahoe_cfg, "rb")
150 # Skip any initial Byte Order Mark. Since this is an ordinary file, we
151 # don't need to handle incomplete reads, and can assume seekability.
152 if f.read(3) != '\xEF\xBB\xBF':
154 self.config.readfp(f)
157 except EnvironmentError:
158 if os.path.exists(tahoe_cfg):
161 cfg_tubport = self.get_config("node", "tub.port", "")
163 # For 'tub.port', tahoe.cfg overrides the individual file on
164 # disk. So only read self._portnumfile if tahoe.cfg doesn't
167 file_tubport = fileutil.read(self._portnumfile).strip()
168 self.set_config("node", "tub.port", file_tubport)
169 except EnvironmentError:
170 if os.path.exists(self._portnumfile):
173 def error_about_old_config_files(self):
174 """ If any old configuration files are detected, raise OldConfigError. """
178 'nickname', 'webport', 'keepalive_timeout', 'log_gatherer.furl',
179 'disconnect_timeout', 'advertised_ip_addresses', 'introducer.furl',
180 'helper.furl', 'key_generator.furl', 'stats_gatherer.furl',
181 'no_storage', 'readonly_storage', 'sizelimit',
182 'debug_discard_storage', 'run_helper']:
183 if name not in self.GENERATED_FILES:
184 fullfname = os.path.join(self.basedir, name)
185 if os.path.exists(fullfname):
186 oldfnames.add(fullfname)
188 e = OldConfigError(oldfnames)
192 def create_tub(self):
193 certfile = os.path.join(self.basedir, "private", self.CERTFILE)
194 self.tub = Tub(certFile=certfile)
195 self.tub.setOption("logLocalFailures", True)
196 self.tub.setOption("logRemoteFailures", True)
197 self.tub.setOption("expose-remote-exception-types", False)
199 # see #521 for a discussion of how to pick these timeout values.
200 keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
201 if keepalive_timeout_s:
202 self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
203 disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
204 if disconnect_timeout_s:
205 # N.B.: this is in seconds, so use "1800" to get 30min
206 self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
208 self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
209 self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
210 self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
212 tubport = self.get_config("node", "tub.port", "tcp:0")
213 self.tub.listenOn(tubport)
214 # we must wait until our service has started before we can find out
215 # our IP address and thus do tub.setLocation, and we can't register
216 # any services with the Tub until after that point
217 self.tub.setServiceParent(self)
220 ssh_port = self.get_config("node", "ssh.port", "")
222 ssh_keyfile_config = self.get_config("node", "ssh.authorized_keys_file").decode('utf-8')
223 ssh_keyfile = abspath_expanduser_unicode(ssh_keyfile_config, base=self.basedir)
224 from allmydata import manhole
225 m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile)
226 m.setServiceParent(self)
227 self.log("AuthorizedKeysManhole listening on %s" % (ssh_port,))
229 def get_app_versions(self):
230 # TODO: merge this with allmydata.get_package_versions
231 return dict(app_versions.versions)
233 def get_config_from_file(self, name, required=False):
234 """Get the (string) contents of a config file, or None if the file
235 did not exist. If required=True, raise an exception rather than
236 returning None. Any leading or trailing whitespace will be stripped
238 fn = os.path.join(self.basedir, name)
240 return fileutil.read(fn).strip()
241 except EnvironmentError:
246 def write_private_config(self, name, value):
247 """Write the (string) contents of a private config file (which is a
248 config file that resides within the subdirectory named 'private'), and
251 privname = os.path.join(self.basedir, "private", name)
252 open(privname, "w").write(value)
254 def get_private_config(self, name, default=_None):
255 """Read the (string) contents of a private config file (which is a
256 config file that resides within the subdirectory named 'private'),
257 and return it. Return a default, or raise an error if one was not
260 privname = os.path.join(self.basedir, "private", name)
262 return fileutil.read(privname)
263 except EnvironmentError:
264 if os.path.exists(privname):
267 raise MissingConfigEntry("The required configuration file %s is missing."
268 % (quote_output(privname),))
271 def get_or_create_private_config(self, name, default=_None):
272 """Try to get the (string) contents of a private config file (which
273 is a config file that resides within the subdirectory named
274 'private'), and return it. Any leading or trailing whitespace will be
275 stripped from the data.
277 If the file does not exist, and default is not given, report an error.
278 If the file does not exist and a default is specified, try to create
279 it using that default, and then return the value that was written.
280 If 'default' is a string, use it as a default value. If not, treat it
281 as a zero-argument callable that is expected to return a string.
283 privname = os.path.join(self.basedir, "private", name)
285 value = fileutil.read(privname)
286 except EnvironmentError:
287 if os.path.exists(privname):
290 raise MissingConfigEntry("The required configuration file %s is missing."
291 % (quote_output(privname),))
292 if isinstance(default, basestring):
296 fileutil.write(privname, value)
299 def write_config(self, name, value, mode="w"):
300 """Write a string to a config file."""
301 fn = os.path.join(self.basedir, name)
303 fileutil.write(fn, value, mode)
304 except EnvironmentError, e:
305 self.log("Unable to write config file '%s'" % fn)
308 def startService(self):
309 # Note: this class can be started and stopped at most once.
310 self.log("Node.startService")
311 # Record the process id in the twisted log, after startService()
312 # (__init__ is called before fork(), but startService is called
313 # after). Note that Foolscap logs handle pid-logging by itself, no
314 # need to send a pid to the foolscap log here.
315 twlog.msg("My pid: %s" % os.getpid())
317 os.chmod("twistd.pid", 0644)
318 except EnvironmentError:
320 # Delay until the reactor is running.
321 eventually(self._startService)
323 def _startService(self):
324 precondition(reactor.running)
325 self.log("Node._startService")
327 service.MultiService.startService(self)
328 d = defer.succeed(None)
329 d.addCallback(lambda res: iputil.get_local_addresses_async())
330 d.addCallback(self._setup_tub)
332 self.log("%s running" % self.NODETYPE)
333 self._tub_ready_observerlist.fire(self)
335 d.addCallback(_ready)
336 d.addErrback(self._service_startup_failed)
338 def _service_startup_failed(self, failure):
339 self.log('_startService() failed')
341 print "Node._startService failed, aborting"
343 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
344 self.log('calling os.abort()')
345 twlog.msg('calling os.abort()') # make sure it gets into twistd.log
346 print "calling os.abort()"
349 def stopService(self):
350 self.log("Node.stopService")
351 d = self._tub_ready_observerlist.when_fired()
352 def _really_stopService(ignored):
353 self.log("Node._really_stopService")
354 return service.MultiService.stopService(self)
355 d.addCallback(_really_stopService)
359 """Shut down the node. Returns a Deferred that fires (with None) when
360 it finally stops kicking."""
361 self.log("Node.shutdown")
362 return self.stopService()
364 def setup_logging(self):
365 # we replace the formatTime() method of the log observer that
366 # twistd set up for us, with a method that uses our preferred
368 for o in twlog.theLogPublisher.observers:
369 # o might be a FileLogObserver's .emit method
370 if type(o) is type(self.setup_logging): # bound method
372 if isinstance(ob, twlog.FileLogObserver):
373 newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
374 ob.formatTime = newmeth
375 # TODO: twisted >2.5.0 offers maxRotatedFiles=50
377 lgfurl_file = os.path.join(self.basedir, "private", "logport.furl").encode(get_filesystem_encoding())
378 self.tub.setOption("logport-furlfile", lgfurl_file)
379 lgfurl = self.get_config("node", "log_gatherer.furl", "")
381 # this is in addition to the contents of log-gatherer-furlfile
382 self.tub.setOption("log-gatherer-furl", lgfurl)
383 self.tub.setOption("log-gatherer-furlfile",
384 os.path.join(self.basedir, "log_gatherer.furl"))
385 self.tub.setOption("bridge-twisted-logs", True)
386 incident_dir = os.path.join(self.basedir, "logs", "incidents")
387 foolscap.logging.log.setLogDir(incident_dir.encode(get_filesystem_encoding()))
389 def log(self, *args, **kwargs):
390 return log.msg(*args, **kwargs)
392 def _setup_tub(self, local_addresses):
393 # we can't get a dynamically-assigned portnum until our Tub is
394 # running, which means after startService.
395 l = self.tub.getListeners()[0]
396 portnum = l.getPortnum()
397 # record which port we're listening on, so we can grab the same one
399 fileutil.write_atomically(self._portnumfile, "%d\n" % portnum, mode="")
401 base_location = ",".join([ "%s:%d" % (addr, portnum)
402 for addr in local_addresses ])
403 location = self.get_config("node", "tub.location", base_location)
404 self.log("Tub location set to %s" % location)
405 self.tub.setLocation(location)
409 def when_tub_ready(self):
410 return self._tub_ready_observerlist.when_fired()
412 def add_service(self, s):
413 s.setServiceParent(self)