]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
Refactor tahoe.cfg handling to configutil.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1 import datetime, os.path, re, types, ConfigParser, tempfile
2 from base64 import b32decode, b32encode
3
4 from twisted.python import log as twlog
5 from twisted.application import service
6 from twisted.internet import defer, reactor
7 from foolscap.api import Tub, eventually, app_versions
8 import foolscap.logging.log
9 from allmydata import get_package_versions, get_package_versions_string
10 from allmydata.util import log
11 from allmydata.util import fileutil, iputil, observer
12 from allmydata.util.assertutil import precondition, _assert
13 from allmydata.util.fileutil import abspath_expanduser_unicode
14 from allmydata.util.encodingutil import get_filesystem_encoding, quote_output
15 from allmydata.util import configutil
16
17 # Add our application versions to the data that Foolscap's LogPublisher
18 # reports.
19 for thing, things_version in get_package_versions().iteritems():
20     app_versions.add_version(thing, str(things_version))
21
22 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
23 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
24
25
26 def formatTimeTahoeStyle(self, when):
27     # we want UTC timestamps that look like:
28     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
29     d = datetime.datetime.utcfromtimestamp(when)
30     if d.microsecond:
31         return d.isoformat(" ")[:-3]+"Z"
32     else:
33         return d.isoformat(" ") + ".000Z"
34
35 PRIV_README="""
36 This directory contains files which contain private data for the Tahoe node,
37 such as private keys.  On Unix-like systems, the permissions on this directory
38 are set to disallow users other than its owner from reading the contents of
39 the files.   See the 'configuration.rst' documentation file for details."""
40
41 class _None: # used as a marker in get_config()
42     pass
43
44 class MissingConfigEntry(Exception):
45     """ A required config entry was not found. """
46
47 class OldConfigError(Exception):
48     """ An obsolete config file was found. See
49     docs/historical/configuration.rst. """
50     def __str__(self):
51         return ("Found pre-Tahoe-LAFS-v1.3 configuration file(s):\n"
52                 "%s\n"
53                 "See docs/historical/configuration.rst."
54                 % "\n".join([quote_output(fname) for fname in self.args[0]]))
55
56 class OldConfigOptionError(Exception):
57     pass
58
59 class UnescapedHashError(Exception):
60     def __str__(self):
61         return ("The configuration entry %s contained an unescaped '#' character."
62                 % quote_output("[%s]%s = %s" % self.args))
63
64
65 class Node(service.MultiService):
66     # this implements common functionality of both Client nodes and Introducer
67     # nodes.
68     NODETYPE = "unknown NODETYPE"
69     PORTNUMFILE = None
70     CERTFILE = "node.pem"
71     GENERATED_FILES = []
72
73     def __init__(self, basedir=u"."):
74         service.MultiService.__init__(self)
75         self.basedir = abspath_expanduser_unicode(unicode(basedir))
76         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
77         self._tub_ready_observerlist = observer.OneShotObserverList()
78         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
79         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
80
81         # creates self.config
82         self.read_config()
83         nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
84         self.nickname = nickname_utf8.decode("utf-8")
85         assert type(self.nickname) is unicode
86
87         self.init_tempdir()
88         self.create_tub()
89         self.logSource="Node"
90
91         self.setup_ssh()
92         self.setup_logging()
93         self.log("Node constructed. " + get_package_versions_string())
94         iputil.increase_rlimits()
95
96     def init_tempdir(self):
97         tempdir_config = self.get_config("node", "tempdir", "tmp").decode('utf-8')
98         tempdir = abspath_expanduser_unicode(tempdir_config, base=self.basedir)
99         if not os.path.exists(tempdir):
100             fileutil.make_dirs(tempdir)
101         tempfile.tempdir = tempdir
102         # this should cause twisted.web.http (which uses
103         # tempfile.TemporaryFile) to put large request bodies in the given
104         # directory. Without this, the default temp dir is usually /tmp/,
105         # which is frequently too small.
106         test_name = tempfile.mktemp()
107         _assert(os.path.dirname(test_name) == tempdir, test_name, tempdir)
108
109     @staticmethod
110     def _contains_unescaped_hash(item):
111         characters = iter(item)
112         for c in characters:
113             if c == '\\':
114                 characters.next()
115             elif c == '#':
116                 return True
117
118         return False
119
120     def get_config(self, section, option, default=_None, boolean=False):
121         try:
122             if boolean:
123                 return self.config.getboolean(section, option)
124
125             item = self.config.get(section, option)
126             if option.endswith(".furl") and self._contains_unescaped_hash(item):
127                 raise UnescapedHashError(section, option, item)
128
129             return item
130         except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
131             if default is _None:
132                 fn = os.path.join(self.basedir, u"tahoe.cfg")
133                 raise MissingConfigEntry("%s is missing the [%s]%s entry"
134                                          % (quote_output(fn), section, option))
135             return default
136
137     def read_config(self):
138         self.error_about_old_config_files()
139         self.config = ConfigParser.SafeConfigParser()
140
141         tahoe_cfg = os.path.join(self.basedir, "tahoe.cfg")
142         try:
143             self.config = configutil.get_config(tahoe_cfg)
144         except EnvironmentError:
145             if os.path.exists(tahoe_cfg):
146                 raise
147
148         cfg_tubport = self.get_config("node", "tub.port", "")
149         if not cfg_tubport:
150             # For 'tub.port', tahoe.cfg overrides the individual file on
151             # disk. So only read self._portnumfile if tahoe.cfg doesn't
152             # provide a value.
153             try:
154                 file_tubport = fileutil.read(self._portnumfile).strip()
155                 configutil.set_config(self.config, "node", "tub.port", file_tubport)
156             except EnvironmentError:
157                 if os.path.exists(self._portnumfile):
158                     raise
159
160     def error_about_old_config_files(self):
161         """ If any old configuration files are detected, raise OldConfigError. """
162
163         oldfnames = set()
164         for name in [
165             'nickname', 'webport', 'keepalive_timeout', 'log_gatherer.furl',
166             'disconnect_timeout', 'advertised_ip_addresses', 'introducer.furl',
167             'helper.furl', 'key_generator.furl', 'stats_gatherer.furl',
168             'no_storage', 'readonly_storage', 'sizelimit',
169             'debug_discard_storage', 'run_helper']:
170             if name not in self.GENERATED_FILES:
171                 fullfname = os.path.join(self.basedir, name)
172                 if os.path.exists(fullfname):
173                     oldfnames.add(fullfname)
174         if oldfnames:
175             e = OldConfigError(oldfnames)
176             twlog.msg(e)
177             raise e
178
179     def create_tub(self):
180         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
181         self.tub = Tub(certFile=certfile)
182         self.tub.setOption("logLocalFailures", True)
183         self.tub.setOption("logRemoteFailures", True)
184         self.tub.setOption("expose-remote-exception-types", False)
185
186         # see #521 for a discussion of how to pick these timeout values.
187         keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
188         if keepalive_timeout_s:
189             self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
190         disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
191         if disconnect_timeout_s:
192             # N.B.: this is in seconds, so use "1800" to get 30min
193             self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
194
195         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
196         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
197         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
198
199         tubport = self.get_config("node", "tub.port", "tcp:0")
200         self.tub.listenOn(tubport)
201         # we must wait until our service has started before we can find out
202         # our IP address and thus do tub.setLocation, and we can't register
203         # any services with the Tub until after that point
204         self.tub.setServiceParent(self)
205
206     def setup_ssh(self):
207         ssh_port = self.get_config("node", "ssh.port", "")
208         if ssh_port:
209             ssh_keyfile_config = self.get_config("node", "ssh.authorized_keys_file").decode('utf-8')
210             ssh_keyfile = abspath_expanduser_unicode(ssh_keyfile_config, base=self.basedir)
211             from allmydata import manhole
212             m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile)
213             m.setServiceParent(self)
214             self.log("AuthorizedKeysManhole listening on %s" % (ssh_port,))
215
216     def get_app_versions(self):
217         # TODO: merge this with allmydata.get_package_versions
218         return dict(app_versions.versions)
219
220     def get_config_from_file(self, name, required=False):
221         """Get the (string) contents of a config file, or None if the file
222         did not exist. If required=True, raise an exception rather than
223         returning None. Any leading or trailing whitespace will be stripped
224         from the data."""
225         fn = os.path.join(self.basedir, name)
226         try:
227             return fileutil.read(fn).strip()
228         except EnvironmentError:
229             if not required:
230                 return None
231             raise
232
233     def write_private_config(self, name, value):
234         """Write the (string) contents of a private config file (which is a
235         config file that resides within the subdirectory named 'private'), and
236         return it.
237         """
238         privname = os.path.join(self.basedir, "private", name)
239         open(privname, "w").write(value)
240
241     def get_private_config(self, name, default=_None):
242         """Read the (string) contents of a private config file (which is a
243         config file that resides within the subdirectory named 'private'),
244         and return it. Return a default, or raise an error if one was not
245         given.
246         """
247         privname = os.path.join(self.basedir, "private", name)
248         try:
249             return fileutil.read(privname)
250         except EnvironmentError:
251             if os.path.exists(privname):
252                 raise
253             if default is _None:
254                 raise MissingConfigEntry("The required configuration file %s is missing."
255                                          % (quote_output(privname),))
256             return default
257
258     def get_or_create_private_config(self, name, default=_None):
259         """Try to get the (string) contents of a private config file (which
260         is a config file that resides within the subdirectory named
261         'private'), and return it. Any leading or trailing whitespace will be
262         stripped from the data.
263
264         If the file does not exist, and default is not given, report an error.
265         If the file does not exist and a default is specified, try to create
266         it using that default, and then return the value that was written.
267         If 'default' is a string, use it as a default value. If not, treat it
268         as a zero-argument callable that is expected to return a string.
269         """
270         privname = os.path.join(self.basedir, "private", name)
271         try:
272             value = fileutil.read(privname)
273         except EnvironmentError:
274             if os.path.exists(privname):
275                 raise
276             if default is _None:
277                 raise MissingConfigEntry("The required configuration file %s is missing."
278                                          % (quote_output(privname),))
279             if isinstance(default, basestring):
280                 value = default
281             else:
282                 value = default()
283             fileutil.write(privname, value)
284         return value.strip()
285
286     def write_config(self, name, value, mode="w"):
287         """Write a string to a config file."""
288         fn = os.path.join(self.basedir, name)
289         try:
290             fileutil.write(fn, value, mode)
291         except EnvironmentError, e:
292             self.log("Unable to write config file '%s'" % fn)
293             self.log(e)
294
295     def startService(self):
296         # Note: this class can be started and stopped at most once.
297         self.log("Node.startService")
298         # Record the process id in the twisted log, after startService()
299         # (__init__ is called before fork(), but startService is called
300         # after). Note that Foolscap logs handle pid-logging by itself, no
301         # need to send a pid to the foolscap log here.
302         twlog.msg("My pid: %s" % os.getpid())
303         try:
304             os.chmod("twistd.pid", 0644)
305         except EnvironmentError:
306             pass
307         # Delay until the reactor is running.
308         eventually(self._startService)
309
310     def _startService(self):
311         precondition(reactor.running)
312         self.log("Node._startService")
313
314         service.MultiService.startService(self)
315         d = defer.succeed(None)
316         d.addCallback(self._setup_tub)
317         def _ready(res):
318             self.log("%s running" % self.NODETYPE)
319             self._tub_ready_observerlist.fire(self)
320             return self
321         d.addCallback(_ready)
322         d.addErrback(self._service_startup_failed)
323
324     def _service_startup_failed(self, failure):
325         self.log('_startService() failed')
326         log.err(failure)
327         print "Node._startService failed, aborting"
328         print failure
329         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
330         self.log('calling os.abort()')
331         twlog.msg('calling os.abort()') # make sure it gets into twistd.log
332         print "calling os.abort()"
333         os.abort()
334
335     def stopService(self):
336         self.log("Node.stopService")
337         d = self._tub_ready_observerlist.when_fired()
338         def _really_stopService(ignored):
339             self.log("Node._really_stopService")
340             return service.MultiService.stopService(self)
341         d.addCallback(_really_stopService)
342         return d
343
344     def shutdown(self):
345         """Shut down the node. Returns a Deferred that fires (with None) when
346         it finally stops kicking."""
347         self.log("Node.shutdown")
348         return self.stopService()
349
350     def setup_logging(self):
351         # we replace the formatTime() method of the log observer that
352         # twistd set up for us, with a method that uses our preferred
353         # timestamp format.
354         for o in twlog.theLogPublisher.observers:
355             # o might be a FileLogObserver's .emit method
356             if type(o) is type(self.setup_logging): # bound method
357                 ob = o.im_self
358                 if isinstance(ob, twlog.FileLogObserver):
359                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
360                     ob.formatTime = newmeth
361         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
362
363         lgfurl_file = os.path.join(self.basedir, "private", "logport.furl").encode(get_filesystem_encoding())
364         self.tub.setOption("logport-furlfile", lgfurl_file)
365         lgfurl = self.get_config("node", "log_gatherer.furl", "")
366         if lgfurl:
367             # this is in addition to the contents of log-gatherer-furlfile
368             self.tub.setOption("log-gatherer-furl", lgfurl)
369         self.tub.setOption("log-gatherer-furlfile",
370                            os.path.join(self.basedir, "log_gatherer.furl"))
371
372         incident_dir = os.path.join(self.basedir, "logs", "incidents")
373         foolscap.logging.log.setLogDir(incident_dir.encode(get_filesystem_encoding()))
374
375     def log(self, *args, **kwargs):
376         return log.msg(*args, **kwargs)
377
378     def _setup_tub(self, ign):
379         # we can't get a dynamically-assigned portnum until our Tub is
380         # running, which means after startService.
381         l = self.tub.getListeners()[0]
382         portnum = l.getPortnum()
383         # record which port we're listening on, so we can grab the same one
384         # next time
385         fileutil.write_atomically(self._portnumfile, "%d\n" % portnum, mode="")
386
387         location = self.get_config("node", "tub.location", "AUTO")
388
389         # Replace the location "AUTO", if present, with the detected local addresses.
390         split_location = location.split(",")
391         if "AUTO" in split_location:
392             d = iputil.get_local_addresses_async()
393             def _add_local(local_addresses):
394                 while "AUTO" in split_location:
395                     split_location.remove("AUTO")
396
397                 split_location.extend([ "%s:%d" % (addr, portnum)
398                                         for addr in local_addresses ])
399                 return ",".join(split_location)
400             d.addCallback(_add_local)
401         else:
402             d = defer.succeed(location)
403
404         def _got_location(location):
405             self.log("Tub location set to %s" % (location,))
406             self.tub.setLocation(location)
407             return self.tub
408         d.addCallback(_got_location)
409         return d
410
411     def when_tub_ready(self):
412         return self._tub_ready_observerlist.when_fired()
413
414     def add_service(self, s):
415         s.setServiceParent(self)
416         return s