]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
98ce3cb7c539aacec662488446ade79ea599747b
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1 import datetime, os.path, re, types, ConfigParser, tempfile
2 from base64 import b32decode, b32encode
3
4 from twisted.python import log as twlog
5 from twisted.application import service
6 from twisted.internet import defer, reactor
7 from foolscap.api import Tub, eventually, app_versions
8 import foolscap.logging.log
9 from allmydata import get_package_versions, get_package_versions_string
10 from allmydata.util import log
11 from allmydata.util import fileutil, iputil, observer
12 from allmydata.util.assertutil import precondition, _assert
13 from allmydata.util.fileutil import abspath_expanduser_unicode
14 from allmydata.util.encodingutil import get_filesystem_encoding, quote_output
15
16 # Add our application versions to the data that Foolscap's LogPublisher
17 # reports.
18 for thing, things_version in get_package_versions().iteritems():
19     app_versions.add_version(thing, str(things_version))
20
21 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
22 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
23
24
25 def formatTimeTahoeStyle(self, when):
26     # we want UTC timestamps that look like:
27     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
28     d = datetime.datetime.utcfromtimestamp(when)
29     if d.microsecond:
30         return d.isoformat(" ")[:-3]+"Z"
31     else:
32         return d.isoformat(" ") + ".000Z"
33
34 PRIV_README="""
35 This directory contains files which contain private data for the Tahoe node,
36 such as private keys.  On Unix-like systems, the permissions on this directory
37 are set to disallow users other than its owner from reading the contents of
38 the files.   See the 'configuration.rst' documentation file for details."""
39
40 class _None: # used as a marker in get_config()
41     pass
42
43 class MissingConfigEntry(Exception):
44     """ A required config entry was not found. """
45
46 class OldConfigError(Exception):
47     """ An obsolete config file was found. See
48     docs/historical/configuration.rst. """
49     def __str__(self):
50         return ("Found pre-Tahoe-LAFS-v1.3 configuration file(s):\n"
51                 "%s\n"
52                 "See docs/historical/configuration.rst."
53                 % "\n".join([quote_output(fname) for fname in self.args[0]]))
54
55 class OldConfigOptionError(Exception):
56     pass
57
58 class UnescapedHashError(Exception):
59     def __str__(self):
60         return ("The configuration entry %s contained an unescaped '#' character."
61                 % quote_output("[%s]%s = %s" % self.args))
62
63
64 class Node(service.MultiService):
65     # this implements common functionality of both Client nodes and Introducer
66     # nodes.
67     NODETYPE = "unknown NODETYPE"
68     PORTNUMFILE = None
69     CERTFILE = "node.pem"
70     GENERATED_FILES = []
71
72     def __init__(self, basedir=u"."):
73         service.MultiService.__init__(self)
74         self.basedir = abspath_expanduser_unicode(unicode(basedir))
75         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
76         self._tub_ready_observerlist = observer.OneShotObserverList()
77         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
78         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
79
80         # creates self.config
81         self.read_config()
82         nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
83         self.nickname = nickname_utf8.decode("utf-8")
84         assert type(self.nickname) is unicode
85
86         self.init_tempdir()
87         self.create_tub()
88         self.logSource="Node"
89
90         self.setup_ssh()
91         self.setup_logging()
92         self.log("Node constructed. " + get_package_versions_string())
93         iputil.increase_rlimits()
94
95     def init_tempdir(self):
96         tempdir_config = self.get_config("node", "tempdir", "tmp").decode('utf-8')
97         tempdir = abspath_expanduser_unicode(tempdir_config, base=self.basedir)
98         if not os.path.exists(tempdir):
99             fileutil.make_dirs(tempdir)
100         tempfile.tempdir = tempdir
101         # this should cause twisted.web.http (which uses
102         # tempfile.TemporaryFile) to put large request bodies in the given
103         # directory. Without this, the default temp dir is usually /tmp/,
104         # which is frequently too small.
105         test_name = tempfile.mktemp()
106         _assert(os.path.dirname(test_name) == tempdir, test_name, tempdir)
107
108     @staticmethod
109     def _contains_unescaped_hash(item):
110         characters = iter(item)
111         for c in characters:
112             if c == '\\':
113                 characters.next()
114             elif c == '#':
115                 return True
116
117         return False
118
119     def get_config(self, section, option, default=_None, boolean=False):
120         try:
121             if boolean:
122                 return self.config.getboolean(section, option)
123
124             item = self.config.get(section, option)
125             if option.endswith(".furl") and self._contains_unescaped_hash(item):
126                 raise UnescapedHashError(section, option, item)
127
128             return item
129         except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
130             if default is _None:
131                 fn = os.path.join(self.basedir, u"tahoe.cfg")
132                 raise MissingConfigEntry("%s is missing the [%s]%s entry"
133                                          % (quote_output(fn), section, option))
134             return default
135
136     def set_config(self, section, option, value):
137         if not self.config.has_section(section):
138             self.config.add_section(section)
139         self.config.set(section, option, value)
140         assert self.config.get(section, option) == value
141
142     def read_config(self):
143         self.error_about_old_config_files()
144         self.config = ConfigParser.SafeConfigParser()
145
146         tahoe_cfg = os.path.join(self.basedir, "tahoe.cfg")
147         try:
148             f = open(tahoe_cfg, "rb")
149             try:
150                 # Skip any initial Byte Order Mark. Since this is an ordinary file, we
151                 # don't need to handle incomplete reads, and can assume seekability.
152                 if f.read(3) != '\xEF\xBB\xBF':
153                     f.seek(0)
154                 self.config.readfp(f)
155             finally:
156                 f.close()
157         except EnvironmentError:
158             if os.path.exists(tahoe_cfg):
159                 raise
160
161         cfg_tubport = self.get_config("node", "tub.port", "")
162         if not cfg_tubport:
163             # For 'tub.port', tahoe.cfg overrides the individual file on
164             # disk. So only read self._portnumfile if tahoe.cfg doesn't
165             # provide a value.
166             try:
167                 file_tubport = fileutil.read(self._portnumfile).strip()
168                 self.set_config("node", "tub.port", file_tubport)
169             except EnvironmentError:
170                 if os.path.exists(self._portnumfile):
171                     raise
172
173     def error_about_old_config_files(self):
174         """ If any old configuration files are detected, raise OldConfigError. """
175
176         oldfnames = set()
177         for name in [
178             'nickname', 'webport', 'keepalive_timeout', 'log_gatherer.furl',
179             'disconnect_timeout', 'advertised_ip_addresses', 'introducer.furl',
180             'helper.furl', 'key_generator.furl', 'stats_gatherer.furl',
181             'no_storage', 'readonly_storage', 'sizelimit',
182             'debug_discard_storage', 'run_helper']:
183             if name not in self.GENERATED_FILES:
184                 fullfname = os.path.join(self.basedir, name)
185                 if os.path.exists(fullfname):
186                     oldfnames.add(fullfname)
187         if oldfnames:
188             e = OldConfigError(oldfnames)
189             twlog.msg(e)
190             raise e
191
192     def create_tub(self):
193         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
194         self.tub = Tub(certFile=certfile)
195         self.tub.setOption("logLocalFailures", True)
196         self.tub.setOption("logRemoteFailures", True)
197         self.tub.setOption("expose-remote-exception-types", False)
198
199         # see #521 for a discussion of how to pick these timeout values.
200         keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
201         if keepalive_timeout_s:
202             self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
203         disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
204         if disconnect_timeout_s:
205             # N.B.: this is in seconds, so use "1800" to get 30min
206             self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
207
208         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
209         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
210         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
211
212         tubport = self.get_config("node", "tub.port", "tcp:0")
213         self.tub.listenOn(tubport)
214         # we must wait until our service has started before we can find out
215         # our IP address and thus do tub.setLocation, and we can't register
216         # any services with the Tub until after that point
217         self.tub.setServiceParent(self)
218
219     def setup_ssh(self):
220         ssh_port = self.get_config("node", "ssh.port", "")
221         if ssh_port:
222             ssh_keyfile_config = self.get_config("node", "ssh.authorized_keys_file").decode('utf-8')
223             ssh_keyfile = abspath_expanduser_unicode(ssh_keyfile_config, base=self.basedir)
224             from allmydata import manhole
225             m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile)
226             m.setServiceParent(self)
227             self.log("AuthorizedKeysManhole listening on %s" % (ssh_port,))
228
229     def get_app_versions(self):
230         # TODO: merge this with allmydata.get_package_versions
231         return dict(app_versions.versions)
232
233     def get_config_from_file(self, name, required=False):
234         """Get the (string) contents of a config file, or None if the file
235         did not exist. If required=True, raise an exception rather than
236         returning None. Any leading or trailing whitespace will be stripped
237         from the data."""
238         fn = os.path.join(self.basedir, name)
239         try:
240             return fileutil.read(fn).strip()
241         except EnvironmentError:
242             if not required:
243                 return None
244             raise
245
246     def write_private_config(self, name, value):
247         """Write the (string) contents of a private config file (which is a
248         config file that resides within the subdirectory named 'private'), and
249         return it.
250         """
251         privname = os.path.join(self.basedir, "private", name)
252         open(privname, "w").write(value)
253
254     def get_private_config(self, name, default=_None):
255         """Read the (string) contents of a private config file (which is a
256         config file that resides within the subdirectory named 'private'),
257         and return it. Return a default, or raise an error if one was not
258         given.
259         """
260         privname = os.path.join(self.basedir, "private", name)
261         try:
262             return fileutil.read(privname)
263         except EnvironmentError:
264             if os.path.exists(privname):
265                 raise
266             if default is _None:
267                 raise MissingConfigEntry("The required configuration file %s is missing."
268                                          % (quote_output(privname),))
269             return default
270
271     def get_or_create_private_config(self, name, default=_None):
272         """Try to get the (string) contents of a private config file (which
273         is a config file that resides within the subdirectory named
274         'private'), and return it. Any leading or trailing whitespace will be
275         stripped from the data.
276
277         If the file does not exist, and default is not given, report an error.
278         If the file does not exist and a default is specified, try to create
279         it using that default, and then return the value that was written.
280         If 'default' is a string, use it as a default value. If not, treat it
281         as a zero-argument callable that is expected to return a string.
282         """
283         privname = os.path.join(self.basedir, "private", name)
284         try:
285             value = fileutil.read(privname)
286         except EnvironmentError:
287             if os.path.exists(privname):
288                 raise
289             if default is _None:
290                 raise MissingConfigEntry("The required configuration file %s is missing."
291                                          % (quote_output(privname),))
292             if isinstance(default, basestring):
293                 value = default
294             else:
295                 value = default()
296             fileutil.write(privname, value)
297         return value.strip()
298
299     def write_config(self, name, value, mode="w"):
300         """Write a string to a config file."""
301         fn = os.path.join(self.basedir, name)
302         try:
303             fileutil.write(fn, value, mode)
304         except EnvironmentError, e:
305             self.log("Unable to write config file '%s'" % fn)
306             self.log(e)
307
308     def startService(self):
309         # Note: this class can be started and stopped at most once.
310         self.log("Node.startService")
311         # Record the process id in the twisted log, after startService()
312         # (__init__ is called before fork(), but startService is called
313         # after). Note that Foolscap logs handle pid-logging by itself, no
314         # need to send a pid to the foolscap log here.
315         twlog.msg("My pid: %s" % os.getpid())
316         try:
317             os.chmod("twistd.pid", 0644)
318         except EnvironmentError:
319             pass
320         # Delay until the reactor is running.
321         eventually(self._startService)
322
323     def _startService(self):
324         precondition(reactor.running)
325         self.log("Node._startService")
326
327         service.MultiService.startService(self)
328         d = defer.succeed(None)
329         d.addCallback(self._setup_tub)
330         def _ready(res):
331             self.log("%s running" % self.NODETYPE)
332             self._tub_ready_observerlist.fire(self)
333             return self
334         d.addCallback(_ready)
335         d.addErrback(self._service_startup_failed)
336
337     def _service_startup_failed(self, failure):
338         self.log('_startService() failed')
339         log.err(failure)
340         print "Node._startService failed, aborting"
341         print failure
342         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
343         self.log('calling os.abort()')
344         twlog.msg('calling os.abort()') # make sure it gets into twistd.log
345         print "calling os.abort()"
346         os.abort()
347
348     def stopService(self):
349         self.log("Node.stopService")
350         d = self._tub_ready_observerlist.when_fired()
351         def _really_stopService(ignored):
352             self.log("Node._really_stopService")
353             return service.MultiService.stopService(self)
354         d.addCallback(_really_stopService)
355         return d
356
357     def shutdown(self):
358         """Shut down the node. Returns a Deferred that fires (with None) when
359         it finally stops kicking."""
360         self.log("Node.shutdown")
361         return self.stopService()
362
363     def setup_logging(self):
364         # we replace the formatTime() method of the log observer that
365         # twistd set up for us, with a method that uses our preferred
366         # timestamp format.
367         for o in twlog.theLogPublisher.observers:
368             # o might be a FileLogObserver's .emit method
369             if type(o) is type(self.setup_logging): # bound method
370                 ob = o.im_self
371                 if isinstance(ob, twlog.FileLogObserver):
372                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
373                     ob.formatTime = newmeth
374         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
375
376         lgfurl_file = os.path.join(self.basedir, "private", "logport.furl").encode(get_filesystem_encoding())
377         self.tub.setOption("logport-furlfile", lgfurl_file)
378         lgfurl = self.get_config("node", "log_gatherer.furl", "")
379         if lgfurl:
380             # this is in addition to the contents of log-gatherer-furlfile
381             self.tub.setOption("log-gatherer-furl", lgfurl)
382         self.tub.setOption("log-gatherer-furlfile",
383                            os.path.join(self.basedir, "log_gatherer.furl"))
384
385         incident_dir = os.path.join(self.basedir, "logs", "incidents")
386         foolscap.logging.log.setLogDir(incident_dir.encode(get_filesystem_encoding()))
387
388     def log(self, *args, **kwargs):
389         return log.msg(*args, **kwargs)
390
391     def _setup_tub(self, ign):
392         # we can't get a dynamically-assigned portnum until our Tub is
393         # running, which means after startService.
394         l = self.tub.getListeners()[0]
395         portnum = l.getPortnum()
396         # record which port we're listening on, so we can grab the same one
397         # next time
398         fileutil.write_atomically(self._portnumfile, "%d\n" % portnum, mode="")
399
400         location = self.get_config("node", "tub.location", "AUTO")
401
402         # Replace the location "AUTO", if present, with the detected local addresses.
403         split_location = location.split(",")
404         if "AUTO" in split_location:
405             d = iputil.get_local_addresses_async()
406             def _add_local(local_addresses):
407                 while "AUTO" in split_location:
408                     split_location.remove("AUTO")
409
410                 split_location.extend([ "%s:%d" % (addr, portnum)
411                                         for addr in local_addresses ])
412                 return ",".join(split_location)
413             d.addCallback(_add_local)
414         else:
415             d = defer.succeed(location)
416
417         def _got_location(location):
418             self.log("Tub location set to %s" % (location,))
419             self.tub.setLocation(location)
420             return self.tub
421         d.addCallback(_got_location)
422         return d
423
424     def when_tub_ready(self):
425         return self._tub_ready_observerlist.when_fired()
426
427     def add_service(self, s):
428         s.setServiceParent(self)
429         return s