]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
tahoe.cfg: add tub.location, to override the location hints we include in our FURL...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types, ConfigParser
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log as twlog
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 import foolscap.logging.log
10 from allmydata import get_package_versions, get_package_versions_string
11 from allmydata.util import log
12 from allmydata.util import fileutil, iputil, observer
13 from allmydata.util.assertutil import precondition
14
15 from foolscap.logging import app_versions
16
17 # Add our application versions to the data that Foolscap's LogPublisher
18 # reports.
19 for thing, things_version in get_package_versions().iteritems():
20     app_versions.add_version(thing, str(things_version))
21
22 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
23 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
24
25
26 def formatTimeTahoeStyle(self, when):
27     # we want UTC timestamps that look like:
28     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
29     d = datetime.datetime.utcfromtimestamp(when)
30     if d.microsecond:
31         return d.isoformat(" ")[:-3]+"Z"
32     else:
33         return d.isoformat(" ") + ".000Z"
34
35 PRIV_README="""
36 This directory contains files which contain private data for the Tahoe node,
37 such as private keys.  On Unix-like systems, the permissions on this directory
38 are set to disallow users other than its owner from reading the contents of
39 the files.   See the 'configuration.txt' documentation file for details."""
40
41 class _None: # used as a marker in get_config()
42     pass
43
44 class MissingConfigEntry(Exception):
45     pass
46
47 class Node(service.MultiService):
48     # this implements common functionality of both Client nodes and Introducer
49     # nodes.
50     NODETYPE = "unknown NODETYPE"
51     PORTNUMFILE = None
52     CERTFILE = "node.pem"
53
54     def __init__(self, basedir="."):
55         service.MultiService.__init__(self)
56         self.basedir = os.path.abspath(basedir)
57         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
58         self._tub_ready_observerlist = observer.OneShotObserverList()
59         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
60         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
61
62         # creates self.config, populates from distinct files if necessary
63         self.read_config()
64
65         nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
66         self.nickname = nickname_utf8.decode("utf-8")
67
68         self.create_tub()
69         self.logSource="Node"
70
71         self.setup_ssh()
72         self.setup_logging()
73         self.log("Node constructed. " + get_package_versions_string())
74         iputil.increase_rlimits()
75
76     def get_config(self, section, option, default=_None, boolean=False):
77         try:
78             if boolean:
79                 return self.config.getboolean(section, option)
80             return self.config.get(section, option)
81         except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
82             if default is _None:
83                 fn = os.path.join(self.basedir, "tahoe.cfg")
84                 raise MissingConfigEntry("%s is missing the [%s]%s entry"
85                                          % (fn, section, option))
86             return default
87
88     def set_config(self, section, option, value):
89         if not self.config.has_section(section):
90             self.config.add_section(section)
91         self.config.set(section, option, value)
92         assert self.config.get(section, option) == value
93
94     def read_config(self):
95         self.config = ConfigParser.SafeConfigParser()
96         self.config.read([os.path.join(self.basedir, "tahoe.cfg")])
97         self.read_old_config_files()
98
99     def read_old_config_files(self):
100         # backwards-compatibility: individual files will override the
101         # contents of tahoe.cfg
102         copy = self._copy_config_from_file
103
104         copy("nickname", "node", "nickname")
105         copy("webport", "node", "web.port")
106
107         cfg_tubport = self.get_config("node", "tub.port", "")
108         if not cfg_tubport:
109             # For 'tub.port', tahoe.cfg overrides the individual file on
110             # disk. So only read self._portnumfile is tahoe.cfg doesn't
111             # provide a value.
112             try:
113                 file_tubport = open(self._portnumfile, "rU").read().strip()
114                 self.set_config("node", "tub.port", file_tubport)
115             except EnvironmentError:
116                 pass
117
118         copy("keepalive_timeout", "node", "timeout.keepalive")
119         copy("disconnect_timeout", "node", "timeout.disconnect")
120
121     def _copy_config_from_file(self, config_filename, section, keyname):
122         s = self.get_config_from_file(config_filename)
123         if s is not None:
124             self.set_config(section, keyname, s)
125
126     def create_tub(self):
127         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
128         self.tub = Tub(certFile=certfile)
129         self.tub.setOption("logLocalFailures", True)
130         self.tub.setOption("logRemoteFailures", True)
131
132         # see #521 for a discussion of how to pick these timeout values.
133         keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
134         if keepalive_timeout_s:
135             self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
136         disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
137         if disconnect_timeout_s:
138             # N.B.: this is in seconds, so use "1800" to get 30min
139             self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
140
141         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
142         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
143         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
144
145         tubport = self.get_config("node", "tub.port", "tcp:0")
146         self.tub.listenOn(tubport)
147         # we must wait until our service has started before we can find out
148         # our IP address and thus do tub.setLocation, and we can't register
149         # any services with the Tub until after that point
150         self.tub.setServiceParent(self)
151
152     def setup_ssh(self):
153         ssh_port = self.get_config("node", "ssh.port", "")
154         if ssh_port:
155             ssh_keyfile = self.get_config("node", "ssh.authorized_keys_file")
156             from allmydata import manhole
157             m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile)
158             m.setServiceParent(self)
159             self.log("AuthorizedKeysManhole listening on %s" % ssh_port)
160
161     def get_app_versions(self):
162         # TODO: merge this with allmydata.get_package_versions
163         return dict(app_versions.versions)
164
165     def get_config_from_file(self, name, required=False):
166         """Get the (string) contents of a config file, or None if the file
167         did not exist. If required=True, raise an exception rather than
168         returning None. Any leading or trailing whitespace will be stripped
169         from the data."""
170         fn = os.path.join(self.basedir, name)
171         try:
172             return open(fn, "r").read().strip()
173         except EnvironmentError:
174             if not required:
175                 return None
176             raise
177
178     def write_private_config(self, name, value):
179         """Write the (string) contents of a private config file (which is a
180         config file that resides within the subdirectory named 'private'), and
181         return it. Any leading or trailing whitespace will be stripped from
182         the data.
183         """
184         privname = os.path.join(self.basedir, "private", name)
185         open(privname, "w").write(value.strip())
186
187     def get_or_create_private_config(self, name, default):
188         """Try to get the (string) contents of a private config file (which
189         is a config file that resides within the subdirectory named
190         'private'), and return it. Any leading or trailing whitespace will be
191         stripped from the data.
192
193         If the file does not exist, try to create it using default, and
194         then return the value that was written. If 'default' is a string,
195         use it as a default value. If not, treat it as a 0-argument callable
196         which is expected to return a string.
197         """
198         privname = os.path.join("private", name)
199         value = self.get_config_from_file(privname)
200         if value is None:
201             if isinstance(default, (str, unicode)):
202                 value = default
203             else:
204                 value = default()
205             fn = os.path.join(self.basedir, privname)
206             try:
207                 open(fn, "w").write(value)
208             except EnvironmentError, e:
209                 self.log("Unable to write config file '%s'" % fn)
210                 self.log(e)
211             value = value.strip()
212         return value
213
214     def write_config(self, name, value, mode="w"):
215         """Write a string to a config file."""
216         fn = os.path.join(self.basedir, name)
217         try:
218             open(fn, mode).write(value)
219         except EnvironmentError, e:
220             self.log("Unable to write config file '%s'" % fn)
221             self.log(e)
222
223     def startService(self):
224         # Note: this class can be started and stopped at most once.
225         self.log("Node.startService")
226         try:
227             os.chmod("twistd.pid", 0644)
228         except EnvironmentError:
229             pass
230         # Delay until the reactor is running.
231         eventual.eventually(self._startService)
232
233     def _startService(self):
234         precondition(reactor.running)
235         self.log("Node._startService")
236
237         service.MultiService.startService(self)
238         d = defer.succeed(None)
239         d.addCallback(lambda res: iputil.get_local_addresses_async())
240         d.addCallback(self._setup_tub)
241         def _ready(res):
242             self.log("%s running" % self.NODETYPE)
243             self._tub_ready_observerlist.fire(self)
244             return self
245         d.addCallback(_ready)
246         d.addErrback(self._service_startup_failed)
247
248     def _service_startup_failed(self, failure):
249         self.log('_startService() failed')
250         log.err(failure)
251         print "Node._startService failed, aborting"
252         print failure
253         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
254         self.log('calling os.abort()')
255         twlog.msg('calling os.abort()') # make sure it gets into twistd.log
256         print "calling os.abort()"
257         os.abort()
258
259     def stopService(self):
260         self.log("Node.stopService")
261         d = self._tub_ready_observerlist.when_fired()
262         def _really_stopService(ignored):
263             self.log("Node._really_stopService")
264             return service.MultiService.stopService(self)
265         d.addCallback(_really_stopService)
266         return d
267
268     def shutdown(self):
269         """Shut down the node. Returns a Deferred that fires (with None) when
270         it finally stops kicking."""
271         self.log("Node.shutdown")
272         return self.stopService()
273
274     def setup_logging(self):
275         # we replace the formatTime() method of the log observer that twistd
276         # set up for us, with a method that uses better timestamps.
277         for o in twlog.theLogPublisher.observers:
278             # o might be a FileLogObserver's .emit method
279             if type(o) is type(self.setup_logging): # bound method
280                 ob = o.im_self
281                 if isinstance(ob, twlog.FileLogObserver):
282                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
283                     ob.formatTime = newmeth
284         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
285
286         self.tub.setOption("logport-furlfile",
287                            os.path.join(self.basedir, "private","logport.furl"))
288         lgfurl = self.get_config("node", "log_gatherer.furl", "")
289         if lgfurl:
290             # this is in addition to the contents of log-gatherer-furlfile
291             self.tub.setOption("log-gatherer-furl", lgfurl)
292         self.tub.setOption("log-gatherer-furlfile",
293                            os.path.join(self.basedir, "log_gatherer.furl"))
294         self.tub.setOption("bridge-twisted-logs", True)
295         incident_dir = os.path.join(self.basedir, "logs", "incidents")
296         # this doesn't quite work yet: unit tests fail
297         foolscap.logging.log.setLogDir(incident_dir)
298
299     def log(self, *args, **kwargs):
300         return log.msg(*args, **kwargs)
301
302     def _setup_tub(self, local_addresses):
303         # we can't get a dynamically-assigned portnum until our Tub is
304         # running, which means after startService.
305         l = self.tub.getListeners()[0]
306         portnum = l.getPortnum()
307         # record which port we're listening on, so we can grab the same one
308         # next time
309         open(self._portnumfile, "w").write("%d\n" % portnum)
310
311         base_location = ",".join([ "%s:%d" % (addr, portnum)
312                                    for addr in local_addresses ])
313         location = self.get_config("node", "tub.location", base_location)
314         self.log("Tub location set to %s" % location)
315         self.tub.setLocation(location)
316
317         return self.tub
318
319     def when_tub_ready(self):
320         return self._tub_ready_observerlist.when_fired()
321
322     def add_service(self, s):
323         s.setServiceParent(self)
324         return s
325