]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
node.py: remove support for the old BASEDIR/authorized_keys.PORT file
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types, ConfigParser
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log as twlog
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 import foolscap.logging.log
10 from allmydata import get_package_versions, get_package_versions_string
11 from allmydata.util import log
12 from allmydata.util import fileutil, iputil, observer, humanreadable
13 from allmydata.util.assertutil import precondition
14
15 from foolscap.logging import app_versions
16
17 # Add our application versions to the data that Foolscap's LogPublisher
18 # reports.
19 for thing, things_version in get_package_versions().iteritems():
20     app_versions.add_version(thing, str(things_version))
21
22 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
23 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
24
25
26 def formatTimeTahoeStyle(self, when):
27     # we want UTC timestamps that look like:
28     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
29     d = datetime.datetime.utcfromtimestamp(when)
30     if d.microsecond:
31         return d.isoformat(" ")[:-3]+"Z"
32     else:
33         return d.isoformat(" ") + ".000Z"
34
35 PRIV_README="""
36 This directory contains files which contain private data for the Tahoe node,
37 such as private keys.  On Unix-like systems, the permissions on this directory
38 are set to disallow users other than its owner from reading the contents of
39 the files.   See the 'configuration.txt' documentation file for details."""
40
41 class _None: # used as a marker in get_config()
42     pass
43
44 class MissingConfigEntry(Exception):
45     pass
46
47 class Node(service.MultiService):
48     # this implements common functionality of both Client nodes and Introducer
49     # nodes.
50     NODETYPE = "unknown NODETYPE"
51     PORTNUMFILE = None
52     CERTFILE = "node.pem"
53     LOCAL_IP_FILE = "advertised_ip_addresses"
54
55     def __init__(self, basedir="."):
56         service.MultiService.__init__(self)
57         self.basedir = os.path.abspath(basedir)
58         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
59         self._tub_ready_observerlist = observer.OneShotObserverList()
60         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
61         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
62
63         # creates self.config, populates from distinct files if necessary
64         self.read_config()
65
66         nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
67         self.nickname = nickname_utf8.decode("utf-8")
68
69         self.create_tub()
70         self.logSource="Node"
71
72         self.setup_ssh()
73         self.setup_logging()
74         self.log("Node constructed. " + get_package_versions_string())
75         iputil.increase_rlimits()
76
77     def get_config(self, section, option, default=_None, boolean=False):
78         try:
79             if boolean:
80                 return self.config.getboolean(section, option)
81             return self.config.get(section, option)
82         except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
83             if default is _None:
84                 fn = os.path.join(self.basedir, "tahoe.cfg")
85                 raise MissingConfigEntry("%s is missing the [%s]%s entry"
86                                          % (fn, section, option))
87             return default
88
89     def set_config(self, section, option, value):
90         if not self.config.has_section(section):
91             self.config.add_section(section)
92         self.config.set(section, option, value)
93         assert self.config.get(section, option) == value
94
95     def read_config(self):
96         self.config = ConfigParser.SafeConfigParser()
97         self.config.read([os.path.join(self.basedir, "tahoe.cfg")])
98         self.read_old_config_files()
99
100     def read_old_config_files(self):
101         # backwards-compatibility: individual files will override the
102         # contents of tahoe.cfg
103         copy = self._copy_config_from_file
104
105         copy("nickname", "node", "nickname")
106         copy("webport", "node", "web.port")
107
108         cfg_tubport = self.get_config("node", "tub.port", "")
109         if not cfg_tubport:
110             # For 'tub.port', tahoe.cfg overrides the individual file on
111             # disk. So only read self._portnumfile is tahoe.cfg doesn't
112             # provide a value.
113             try:
114                 file_tubport = open(self._portnumfile, "rU").read().strip()
115                 self.set_config("node", "tub.port", file_tubport)
116             except EnvironmentError:
117                 pass
118
119         try:
120             addresses = []
121             ipfile = os.path.join(self.basedir, self.LOCAL_IP_FILE)
122             tubport = int(self.get_config("node", "tub.port", "0"))
123             for addrline in open(ipfile, "rU"):
124                 mo = ADDR_RE.search(addrline)
125                 if mo:
126                     (addr, dummy, aportnum,) = mo.groups()
127                     if aportnum is None:
128                         aportnum = tubport
129                     addresses.append("%s:%d" % (addr, int(aportnum),))
130             self.set_config("node", "advertised_ip_addresses",
131                             ",".join(addresses))
132         except EnvironmentError:
133             pass
134         copy("keepalive_timeout", "node", "timeout.keepalive")
135         copy("disconnect_timeout", "node", "timeout.disconnect")
136
137     def _copy_config_from_file(self, config_filename, section, keyname):
138         s = self.get_config_from_file(config_filename)
139         if s is not None:
140             self.set_config(section, keyname, s)
141
142     def create_tub(self):
143         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
144         self.tub = Tub(certFile=certfile)
145         self.tub.setOption("logLocalFailures", True)
146         self.tub.setOption("logRemoteFailures", True)
147
148         # see #521 for a discussion of how to pick these timeout values.
149         keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
150         if keepalive_timeout_s:
151             self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
152         disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
153         if disconnect_timeout_s:
154             # N.B.: this is in seconds, so use "1800" to get 30min
155             self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
156
157         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
158         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
159         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
160
161         tubport = self.get_config("node", "tub.port", "tcp:0")
162         self.tub.listenOn(tubport)
163         # we must wait until our service has started before we can find out
164         # our IP address and thus do tub.setLocation, and we can't register
165         # any services with the Tub until after that point
166         self.tub.setServiceParent(self)
167
168     def setup_ssh(self):
169         ssh_port = self.get_config("node", "ssh.port", "")
170         if ssh_port:
171             ssh_keyfile = self.get_config("node", "ssh.authorized_keys_file")
172             from allmydata import manhole
173             m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile)
174             m.setServiceParent(self)
175             self.log("AuthorizedKeysManhole listening on %s" % ssh_port)
176
177     def get_app_versions(self):
178         # TODO: merge this with allmydata.get_package_versions
179         return dict(app_versions.versions)
180
181     def get_config_from_file(self, name, required=False):
182         """Get the (string) contents of a config file, or None if the file
183         did not exist. If required=True, raise an exception rather than
184         returning None. Any leading or trailing whitespace will be stripped
185         from the data."""
186         fn = os.path.join(self.basedir, name)
187         try:
188             return open(fn, "r").read().strip()
189         except EnvironmentError:
190             if not required:
191                 return None
192             raise
193
194     def write_private_config(self, name, value):
195         """Write the (string) contents of a private config file (which is a
196         config file that resides within the subdirectory named 'private'), and
197         return it. Any leading or trailing whitespace will be stripped from
198         the data.
199         """
200         privname = os.path.join(self.basedir, "private", name)
201         open(privname, "w").write(value.strip())
202
203     def get_or_create_private_config(self, name, default):
204         """Try to get the (string) contents of a private config file (which
205         is a config file that resides within the subdirectory named
206         'private'), and return it. Any leading or trailing whitespace will be
207         stripped from the data.
208
209         If the file does not exist, try to create it using default, and
210         then return the value that was written. If 'default' is a string,
211         use it as a default value. If not, treat it as a 0-argument callable
212         which is expected to return a string.
213         """
214         privname = os.path.join("private", name)
215         value = self.get_config_from_file(privname)
216         if value is None:
217             if isinstance(default, (str, unicode)):
218                 value = default
219             else:
220                 value = default()
221             fn = os.path.join(self.basedir, privname)
222             try:
223                 open(fn, "w").write(value)
224             except EnvironmentError, e:
225                 self.log("Unable to write config file '%s'" % fn)
226                 self.log(e)
227             value = value.strip()
228         return value
229
230     def write_config(self, name, value, mode="w"):
231         """Write a string to a config file."""
232         fn = os.path.join(self.basedir, name)
233         try:
234             open(fn, mode).write(value)
235         except EnvironmentError, e:
236             self.log("Unable to write config file '%s'" % fn)
237             self.log(e)
238
239     def startService(self):
240         # Note: this class can be started and stopped at most once.
241         self.log("Node.startService")
242         try:
243             os.chmod("twistd.pid", 0644)
244         except EnvironmentError:
245             pass
246         # Delay until the reactor is running.
247         eventual.eventually(self._startService)
248
249     def _startService(self):
250         precondition(reactor.running)
251         self.log("Node._startService")
252
253         service.MultiService.startService(self)
254         d = defer.succeed(None)
255         d.addCallback(lambda res: iputil.get_local_addresses_async())
256         d.addCallback(self._setup_tub)
257         def _ready(res):
258             self.log("%s running" % self.NODETYPE)
259             self._tub_ready_observerlist.fire(self)
260             return self
261         d.addCallback(_ready)
262         d.addErrback(self._service_startup_failed)
263
264     def _service_startup_failed(self, failure):
265         self.log('_startService() failed')
266         log.err(failure)
267         print "Node._startService failed, aborting"
268         print failure
269         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
270         self.log('calling os.abort()')
271         twlog.msg('calling os.abort()') # make sure it gets into twistd.log
272         print "calling os.abort()"
273         os.abort()
274
275     def stopService(self):
276         self.log("Node.stopService")
277         d = self._tub_ready_observerlist.when_fired()
278         def _really_stopService(ignored):
279             self.log("Node._really_stopService")
280             return service.MultiService.stopService(self)
281         d.addCallback(_really_stopService)
282         return d
283
284     def shutdown(self):
285         """Shut down the node. Returns a Deferred that fires (with None) when
286         it finally stops kicking."""
287         self.log("Node.shutdown")
288         return self.stopService()
289
290     def setup_logging(self):
291         # we replace the formatTime() method of the log observer that twistd
292         # set up for us, with a method that uses better timestamps.
293         for o in twlog.theLogPublisher.observers:
294             # o might be a FileLogObserver's .emit method
295             if type(o) is type(self.setup_logging): # bound method
296                 ob = o.im_self
297                 if isinstance(ob, twlog.FileLogObserver):
298                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
299                     ob.formatTime = newmeth
300         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
301
302         self.tub.setOption("logport-furlfile",
303                            os.path.join(self.basedir, "private","logport.furl"))
304         lgfurl = self.get_config("node", "log_gatherer.furl", "")
305         if lgfurl:
306             # this is in addition to the contents of log-gatherer-furlfile
307             self.tub.setOption("log-gatherer-furl", lgfurl)
308         self.tub.setOption("log-gatherer-furlfile",
309                            os.path.join(self.basedir, "log_gatherer.furl"))
310         self.tub.setOption("bridge-twisted-logs", True)
311         incident_dir = os.path.join(self.basedir, "logs", "incidents")
312         # this doesn't quite work yet: unit tests fail
313         foolscap.logging.log.setLogDir(incident_dir)
314
315     def log(self, *args, **kwargs):
316         return log.msg(*args, **kwargs)
317
318     def old_log(self, msg, src="", args=(), **kw):
319         if src:
320             logsrc = src
321         else:
322             logsrc = self.logSource
323         if args:
324             try:
325                 msg = msg % tuple(map(humanreadable.hr, args))
326             except TypeError, e:
327                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
328         msg = self.short_nodeid + ": " + humanreadable.hr(msg)
329         return twlog.callWithContext({"system":logsrc},
330                                      twlog.msg, msg, **kw)
331
332     def _setup_tub(self, local_addresses):
333         # we can't get a dynamically-assigned portnum until our Tub is
334         # running, which means after startService.
335         l = self.tub.getListeners()[0]
336         portnum = l.getPortnum()
337         # record which port we're listening on, so we can grab the same one next time
338         open(self._portnumfile, "w").write("%d\n" % portnum)
339
340         addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
341         extra_addresses = self.get_config("node", "advertised_ip_addresses", "")
342         if extra_addresses:
343             extra_addresses = extra_addresses.split(",")
344             addresses.extend(extra_addresses)
345
346         location = ",".join(addresses)
347         self.log("Tub location set to %s" % location)
348         self.tub.setLocation(location)
349         return self.tub
350
351     def when_tub_ready(self):
352         return self._tub_ready_observerlist.when_fired()
353
354     def add_service(self, s):
355         s.setServiceParent(self)
356         return s
357