]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
#518: replace various BASEDIR/* config files with a single BASEDIR/tahoe.cfg, with...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types, ConfigParser
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log as twlog
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 import foolscap.logging.log
10 from allmydata import get_package_versions, get_package_versions_string
11 from allmydata.util import log
12 from allmydata.util import fileutil, iputil, observer, humanreadable
13 from allmydata.util.assertutil import precondition
14
15 from foolscap.logging import app_versions
16
17 # Add our application versions to the data that Foolscap's LogPublisher
18 # reports.
19 for thing, things_version in get_package_versions().iteritems():
20     app_versions.add_version(thing, str(things_version))
21
22 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
23 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
24
25
26 def formatTimeTahoeStyle(self, when):
27     # we want UTC timestamps that look like:
28     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
29     d = datetime.datetime.utcfromtimestamp(when)
30     if d.microsecond:
31         return d.isoformat(" ")[:-3]+"Z"
32     else:
33         return d.isoformat(" ") + ".000Z"
34
35 PRIV_README="""
36 This directory contains files which contain private data for the Tahoe node,
37 such as private keys.  On Unix-like systems, the permissions on this directory
38 are set to disallow users other than its owner from reading the contents of
39 the files.   See the 'configuration.txt' documentation file for details."""
40
41 class _None: # used as a marker in get_config()
42     pass
43
44 class MissingConfigEntry(Exception):
45     pass
46
47 class Node(service.MultiService):
48     # this implements common functionality of both Client nodes and Introducer
49     # nodes.
50     NODETYPE = "unknown NODETYPE"
51     PORTNUMFILE = None
52     CERTFILE = "node.pem"
53     LOCAL_IP_FILE = "advertised_ip_addresses"
54
55     def __init__(self, basedir="."):
56         service.MultiService.__init__(self)
57         self.basedir = os.path.abspath(basedir)
58         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
59         self._tub_ready_observerlist = observer.OneShotObserverList()
60         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
61         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
62
63         # creates self.config, populates from distinct files if necessary
64         self.read_config()
65
66         nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
67         self.nickname = nickname_utf8.decode("utf-8")
68
69         self.create_tub()
70         self.logSource="Node"
71
72         self.setup_ssh()
73         self.setup_logging()
74         self.log("Node constructed. " + get_package_versions_string())
75         iputil.increase_rlimits()
76
77     def get_config(self, section, option, default=_None, boolean=False):
78         try:
79             if boolean:
80                 return self.config.getboolean(section, option)
81             return self.config.get(section, option)
82         except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
83             if default is _None:
84                 fn = os.path.join(self.basedir, "tahoe.cfg")
85                 raise MissingConfigEntry("%s is missing the [%s]%s entry"
86                                          % (fn, section, option))
87             return default
88
89     def set_config(self, section, option, value):
90         if not self.config.has_section(section):
91             self.config.add_section(section)
92         self.config.set(section, option, value)
93         assert self.config.get(section, option) == value
94
95     def read_config(self):
96         self.config = ConfigParser.SafeConfigParser()
97         self.config.read([os.path.join(self.basedir, "tahoe.cfg")])
98         self.read_old_config_files()
99
100     def read_old_config_files(self):
101         # backwards-compatibility: individual files will override the
102         # contents of tahoe.cfg
103         copy = self._copy_config_from_file
104
105         copy("nickname", "node", "nickname")
106         copy("webport", "node", "web.port")
107
108         cfg_tubport = self.get_config("node", "tub.port", "")
109         if not cfg_tubport:
110             # For 'tub.port', tahoe.cfg overrides the individual file on
111             # disk. So only read self._portnumfile is tahoe.cfg doesn't
112             # provide a value.
113             try:
114                 file_tubport = open(self._portnumfile, "rU").read().strip()
115                 self.set_config("node", "tub.port", file_tubport)
116             except EnvironmentError:
117                 pass
118
119         try:
120             addresses = []
121             ipfile = os.path.join(self.basedir, self.LOCAL_IP_FILE)
122             tubport = int(self.get_config("node", "tub.port", "0"))
123             for addrline in open(ipfile, "rU"):
124                 mo = ADDR_RE.search(addrline)
125                 if mo:
126                     (addr, dummy, aportnum,) = mo.groups()
127                     if aportnum is None:
128                         aportnum = tubport
129                     addresses.append("%s:%d" % (addr, int(aportnum),))
130             self.set_config("node", "advertised_ip_addresses",
131                             ",".join(addresses))
132         except EnvironmentError:
133             pass
134         copy("keepalive_timeout", "node", "timeout.keepalive")
135         copy("disconnect_timeout", "node", "timeout.disconnect")
136         AUTHKEYSFILEBASE = "authorized_keys."
137         for f in os.listdir(self.basedir):
138             if f.startswith(AUTHKEYSFILEBASE):
139                 keyfile = os.path.join(self.basedir, f)
140                 portnum = int(f[len(AUTHKEYSFILEBASE):])
141                 self.set_config("node", "ssh.port", str(portnum))
142                 self.set_config("node", "ssh.authorized_keys_file", keyfile)
143                 # only allow one
144                 break
145
146     def _copy_config_from_file(self, config_filename, section, keyname):
147         s = self.get_config_from_file(config_filename)
148         if s is not None:
149             self.set_config(section, keyname, s)
150
151     def create_tub(self):
152         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
153         self.tub = Tub(certFile=certfile)
154         self.tub.setOption("logLocalFailures", True)
155         self.tub.setOption("logRemoteFailures", True)
156
157         # see #521 for a discussion of how to pick these timeout values.
158         keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
159         if keepalive_timeout_s:
160             self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
161         disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
162         if disconnect_timeout_s:
163             # N.B.: this is in seconds, so use "1800" to get 30min
164             self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
165
166         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
167         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
168         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
169
170         tubport = self.get_config("node", "tub.port", "tcp:0")
171         self.tub.listenOn(tubport)
172         # we must wait until our service has started before we can find out
173         # our IP address and thus do tub.setLocation, and we can't register
174         # any services with the Tub until after that point
175         self.tub.setServiceParent(self)
176
177     def setup_ssh(self):
178         ssh_port = self.get_config("node", "ssh.port", "")
179         if ssh_port:
180             ssh_keyfile = self.get_config("node", "ssh.authorized_keys_file")
181             from allmydata import manhole
182             m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile)
183             m.setServiceParent(self)
184             self.log("AuthorizedKeysManhole listening on %s" % ssh_port)
185
186     def get_app_versions(self):
187         # TODO: merge this with allmydata.get_package_versions
188         return dict(app_versions.versions)
189
190     def get_config_from_file(self, name, required=False):
191         """Get the (string) contents of a config file, or None if the file
192         did not exist. If required=True, raise an exception rather than
193         returning None. Any leading or trailing whitespace will be stripped
194         from the data."""
195         fn = os.path.join(self.basedir, name)
196         try:
197             return open(fn, "r").read().strip()
198         except EnvironmentError:
199             if not required:
200                 return None
201             raise
202
203     def write_private_config(self, name, value):
204         """Write the (string) contents of a private config file (which is a
205         config file that resides within the subdirectory named 'private'), and
206         return it. Any leading or trailing whitespace will be stripped from
207         the data.
208         """
209         privname = os.path.join(self.basedir, "private", name)
210         open(privname, "w").write(value.strip())
211
212     def get_or_create_private_config(self, name, default):
213         """Try to get the (string) contents of a private config file (which
214         is a config file that resides within the subdirectory named
215         'private'), and return it. Any leading or trailing whitespace will be
216         stripped from the data.
217
218         If the file does not exist, try to create it using default, and
219         then return the value that was written. If 'default' is a string,
220         use it as a default value. If not, treat it as a 0-argument callable
221         which is expected to return a string.
222         """
223         privname = os.path.join("private", name)
224         value = self.get_config_from_file(privname)
225         if value is None:
226             if isinstance(default, (str, unicode)):
227                 value = default
228             else:
229                 value = default()
230             fn = os.path.join(self.basedir, privname)
231             try:
232                 open(fn, "w").write(value)
233             except EnvironmentError, e:
234                 self.log("Unable to write config file '%s'" % fn)
235                 self.log(e)
236             value = value.strip()
237         return value
238
239     def write_config(self, name, value, mode="w"):
240         """Write a string to a config file."""
241         fn = os.path.join(self.basedir, name)
242         try:
243             open(fn, mode).write(value)
244         except EnvironmentError, e:
245             self.log("Unable to write config file '%s'" % fn)
246             self.log(e)
247
248     def startService(self):
249         # Note: this class can be started and stopped at most once.
250         self.log("Node.startService")
251         try:
252             os.chmod("twistd.pid", 0644)
253         except EnvironmentError:
254             pass
255         # Delay until the reactor is running.
256         eventual.eventually(self._startService)
257
258     def _startService(self):
259         precondition(reactor.running)
260         self.log("Node._startService")
261
262         service.MultiService.startService(self)
263         d = defer.succeed(None)
264         d.addCallback(lambda res: iputil.get_local_addresses_async())
265         d.addCallback(self._setup_tub)
266         def _ready(res):
267             self.log("%s running" % self.NODETYPE)
268             self._tub_ready_observerlist.fire(self)
269             return self
270         d.addCallback(_ready)
271         d.addErrback(self._service_startup_failed)
272
273     def _service_startup_failed(self, failure):
274         self.log('_startService() failed')
275         log.err(failure)
276         print "Node._startService failed, aborting"
277         print failure
278         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
279         self.log('calling os.abort()')
280         twlog.msg('calling os.abort()') # make sure it gets into twistd.log
281         print "calling os.abort()"
282         os.abort()
283
284     def stopService(self):
285         self.log("Node.stopService")
286         d = self._tub_ready_observerlist.when_fired()
287         def _really_stopService(ignored):
288             self.log("Node._really_stopService")
289             return service.MultiService.stopService(self)
290         d.addCallback(_really_stopService)
291         return d
292
293     def shutdown(self):
294         """Shut down the node. Returns a Deferred that fires (with None) when
295         it finally stops kicking."""
296         self.log("Node.shutdown")
297         return self.stopService()
298
299     def setup_logging(self):
300         # we replace the formatTime() method of the log observer that twistd
301         # set up for us, with a method that uses better timestamps.
302         for o in twlog.theLogPublisher.observers:
303             # o might be a FileLogObserver's .emit method
304             if type(o) is type(self.setup_logging): # bound method
305                 ob = o.im_self
306                 if isinstance(ob, twlog.FileLogObserver):
307                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
308                     ob.formatTime = newmeth
309         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
310
311         self.tub.setOption("logport-furlfile",
312                            os.path.join(self.basedir, "private","logport.furl"))
313         lgfurl = self.get_config("node", "log_gatherer.furl", "")
314         if lgfurl:
315             # this is in addition to the contents of log-gatherer-furlfile
316             self.tub.setOption("log-gatherer-furl", lgfurl)
317         self.tub.setOption("log-gatherer-furlfile",
318                            os.path.join(self.basedir, "log_gatherer.furl"))
319         self.tub.setOption("bridge-twisted-logs", True)
320         incident_dir = os.path.join(self.basedir, "logs", "incidents")
321         # this doesn't quite work yet: unit tests fail
322         foolscap.logging.log.setLogDir(incident_dir)
323
324     def log(self, *args, **kwargs):
325         return log.msg(*args, **kwargs)
326
327     def old_log(self, msg, src="", args=(), **kw):
328         if src:
329             logsrc = src
330         else:
331             logsrc = self.logSource
332         if args:
333             try:
334                 msg = msg % tuple(map(humanreadable.hr, args))
335             except TypeError, e:
336                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
337         msg = self.short_nodeid + ": " + humanreadable.hr(msg)
338         return twlog.callWithContext({"system":logsrc},
339                                      twlog.msg, msg, **kw)
340
341     def _setup_tub(self, local_addresses):
342         # we can't get a dynamically-assigned portnum until our Tub is
343         # running, which means after startService.
344         l = self.tub.getListeners()[0]
345         portnum = l.getPortnum()
346         # record which port we're listening on, so we can grab the same one next time
347         open(self._portnumfile, "w").write("%d\n" % portnum)
348
349         addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
350         extra_addresses = self.get_config("node", "advertised_ip_addresses", "")
351         if extra_addresses:
352             extra_addresses = extra_addresses.split(",")
353             addresses.extend(extra_addresses)
354
355         location = ",".join(addresses)
356         self.log("Tub location set to %s" % location)
357         self.tub.setLocation(location)
358         return self.tub
359
360     def when_tub_ready(self):
361         return self._tub_ready_observerlist.when_fired()
362
363     def add_service(self, s):
364         s.setServiceParent(self)
365         return s
366