]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
2947044e1705288b7196f7530dd3121d84d8c005
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1 import datetime, os.path, re, types, ConfigParser, tempfile
2 from base64 import b32decode, b32encode
3
4 from twisted.python import log as twlog
5 from twisted.application import service
6 from twisted.internet import defer, reactor
7 from foolscap.api import Tub, eventually, app_versions
8 import foolscap.logging.log
9 from allmydata import get_package_versions, get_package_versions_string
10 from allmydata.util import log
11 from allmydata.util import fileutil, iputil, observer
12 from allmydata.util.assertutil import precondition, _assert
13 from allmydata.util.fileutil import abspath_expanduser_unicode
14 from allmydata.util.encodingutil import get_filesystem_encoding, quote_output
15
16 # Add our application versions to the data that Foolscap's LogPublisher
17 # reports.
18 for thing, things_version in get_package_versions().iteritems():
19     app_versions.add_version(thing, str(things_version))
20
21 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
22 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
23
24
25 def formatTimeTahoeStyle(self, when):
26     # we want UTC timestamps that look like:
27     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
28     d = datetime.datetime.utcfromtimestamp(when)
29     if d.microsecond:
30         return d.isoformat(" ")[:-3]+"Z"
31     else:
32         return d.isoformat(" ") + ".000Z"
33
34 PRIV_README="""
35 This directory contains files which contain private data for the Tahoe node,
36 such as private keys.  On Unix-like systems, the permissions on this directory
37 are set to disallow users other than its owner from reading the contents of
38 the files.   See the 'configuration.rst' documentation file for details."""
39
40 class _None: # used as a marker in get_config()
41     pass
42
43 class MissingConfigEntry(Exception):
44     """ A required config entry was not found. """
45
46 class OldConfigError(Exception):
47     """ An obsolete config file was found. See
48     docs/historical/configuration.rst. """
49
50 class Node(service.MultiService):
51     # this implements common functionality of both Client nodes and Introducer
52     # nodes.
53     NODETYPE = "unknown NODETYPE"
54     PORTNUMFILE = None
55     CERTFILE = "node.pem"
56     GENERATED_FILES = []
57
58     def __init__(self, basedir=u"."):
59         service.MultiService.__init__(self)
60         self.basedir = abspath_expanduser_unicode(unicode(basedir))
61         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
62         self._tub_ready_observerlist = observer.OneShotObserverList()
63         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
64         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
65
66         # creates self.config
67         self.read_config()
68         nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
69         self.nickname = nickname_utf8.decode("utf-8")
70         assert type(self.nickname) is unicode
71
72         self.init_tempdir()
73         self.create_tub()
74         self.logSource="Node"
75
76         self.setup_ssh()
77         self.setup_logging()
78         self.log("Node constructed. " + get_package_versions_string())
79         iputil.increase_rlimits()
80
81     def init_tempdir(self):
82         local_tempdir_utf8 = "tmp" # default is NODEDIR/tmp/
83         tempdir = self.get_config("node", "tempdir", local_tempdir_utf8).decode('utf-8')
84         tempdir = os.path.join(self.basedir, tempdir)
85         if not os.path.exists(tempdir):
86             fileutil.make_dirs(tempdir)
87         tempfile.tempdir = abspath_expanduser_unicode(tempdir)
88         # this should cause twisted.web.http (which uses
89         # tempfile.TemporaryFile) to put large request bodies in the given
90         # directory. Without this, the default temp dir is usually /tmp/,
91         # which is frequently too small.
92         test_name = tempfile.mktemp()
93         _assert(os.path.dirname(test_name) == tempdir, test_name, tempdir)
94
95     def get_config(self, section, option, default=_None, boolean=False):
96         try:
97             if boolean:
98                 return self.config.getboolean(section, option)
99             return self.config.get(section, option)
100         except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
101             if default is _None:
102                 fn = os.path.join(self.basedir, "tahoe.cfg")
103                 raise MissingConfigEntry("%s is missing the [%s]%s entry"
104                                          % (fn, section, option))
105             return default
106
107     def set_config(self, section, option, value):
108         if not self.config.has_section(section):
109             self.config.add_section(section)
110         self.config.set(section, option, value)
111         assert self.config.get(section, option) == value
112
113     def read_config(self):
114         self.error_about_old_config_files()
115         self.config = ConfigParser.SafeConfigParser()
116         self.config.read([os.path.join(self.basedir, "tahoe.cfg")])
117
118     def error_about_old_config_files(self):
119         """ If any old configuration files are detected, raise OldConfigError. """
120
121         oldfnames = set()
122         for name in [
123             'nickname', 'webport', 'keepalive_timeout', 'log_gatherer.furl',
124             'disconnect_timeout', 'advertised_ip_addresses', 'introducer.furl',
125             'helper.furl', 'key_generator.furl', 'stats_gatherer.furl',
126             'no_storage', 'readonly_storage', 'sizelimit',
127             'debug_discard_storage', 'run_helper']:
128             if name not in self.GENERATED_FILES:
129                 fullfname = os.path.join(self.basedir, name)
130                 if os.path.exists(fullfname):
131                     log.err("Found pre-Tahoe-LAFS-v1.3 configuration file: %s. "
132                             "See docs/historical/configuration.rst." % quote_output(fullfname))
133                     oldfnames.add(fullfname)
134         if oldfnames:
135             raise OldConfigError(oldfnames)
136
137     def create_tub(self):
138         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
139         self.tub = Tub(certFile=certfile)
140         self.tub.setOption("logLocalFailures", True)
141         self.tub.setOption("logRemoteFailures", True)
142         self.tub.setOption("expose-remote-exception-types", False)
143
144         # see #521 for a discussion of how to pick these timeout values.
145         keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
146         if keepalive_timeout_s:
147             self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
148         disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
149         if disconnect_timeout_s:
150             # N.B.: this is in seconds, so use "1800" to get 30min
151             self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
152
153         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
154         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
155         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
156
157         tubport = self.get_config("node", "tub.port", "tcp:0")
158         self.tub.listenOn(tubport)
159         # we must wait until our service has started before we can find out
160         # our IP address and thus do tub.setLocation, and we can't register
161         # any services with the Tub until after that point
162         self.tub.setServiceParent(self)
163
164     def setup_ssh(self):
165         ssh_port = self.get_config("node", "ssh.port", "")
166         if ssh_port:
167             ssh_keyfile = self.get_config("node", "ssh.authorized_keys_file").decode('utf-8')
168             from allmydata import manhole
169             m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile.encode(get_filesystem_encoding()))
170             m.setServiceParent(self)
171             self.log("AuthorizedKeysManhole listening on %s" % ssh_port)
172
173     def get_app_versions(self):
174         # TODO: merge this with allmydata.get_package_versions
175         return dict(app_versions.versions)
176
177     def write_private_config(self, name, value):
178         """Write the (string) contents of a private config file (which is a
179         config file that resides within the subdirectory named 'private'), and
180         return it. Any leading or trailing whitespace will be stripped from
181         the data.
182         """
183         privname = os.path.join(self.basedir, "private", name)
184         open(privname, "w").write(value.strip())
185
186     def get_or_create_private_config(self, name, default):
187         """Try to get the (string) contents of a private config file (which
188         is a config file that resides within the subdirectory named
189         'private'), and return it. Any leading or trailing whitespace will be
190         stripped from the data.
191
192         If the file does not exist, try to create it using default, and
193         then return the value that was written. If 'default' is a string,
194         use it as a default value. If not, treat it as a 0-argument callable
195         which is expected to return a string.
196         """
197         privname = os.path.join(self.basedir, "private", name)
198         try:
199             value = fileutil.read(privname)
200         except EnvironmentError:
201             if isinstance(default, basestring):
202                 value = default
203             else:
204                 value = default()
205             fileutil.write(privname, value)
206         return value.strip()
207
208     def write_config(self, name, value, mode="w"):
209         """Write a string to a config file."""
210         fn = os.path.join(self.basedir, name)
211         try:
212             open(fn, mode).write(value)
213         except EnvironmentError, e:
214             self.log("Unable to write config file '%s'" % fn)
215             self.log(e)
216
217     def startService(self):
218         # Note: this class can be started and stopped at most once.
219         self.log("Node.startService")
220         # Record the process id in the twisted log, after startService()
221         # (__init__ is called before fork(), but startService is called
222         # after). Note that Foolscap logs handle pid-logging by itself, no
223         # need to send a pid to the foolscap log here.
224         twlog.msg("My pid: %s" % os.getpid())
225         try:
226             os.chmod("twistd.pid", 0644)
227         except EnvironmentError:
228             pass
229         # Delay until the reactor is running.
230         eventually(self._startService)
231
232     def _startService(self):
233         precondition(reactor.running)
234         self.log("Node._startService")
235
236         service.MultiService.startService(self)
237         d = defer.succeed(None)
238         d.addCallback(lambda res: iputil.get_local_addresses_async())
239         d.addCallback(self._setup_tub)
240         def _ready(res):
241             self.log("%s running" % self.NODETYPE)
242             self._tub_ready_observerlist.fire(self)
243             return self
244         d.addCallback(_ready)
245         d.addErrback(self._service_startup_failed)
246
247     def _service_startup_failed(self, failure):
248         self.log('_startService() failed')
249         log.err(failure)
250         print "Node._startService failed, aborting"
251         print failure
252         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
253         self.log('calling os.abort()')
254         twlog.msg('calling os.abort()') # make sure it gets into twistd.log
255         print "calling os.abort()"
256         os.abort()
257
258     def stopService(self):
259         self.log("Node.stopService")
260         d = self._tub_ready_observerlist.when_fired()
261         def _really_stopService(ignored):
262             self.log("Node._really_stopService")
263             return service.MultiService.stopService(self)
264         d.addCallback(_really_stopService)
265         return d
266
267     def shutdown(self):
268         """Shut down the node. Returns a Deferred that fires (with None) when
269         it finally stops kicking."""
270         self.log("Node.shutdown")
271         return self.stopService()
272
273     def setup_logging(self):
274         # we replace the formatTime() method of the log observer that
275         # twistd set up for us, with a method that uses our preferred
276         # timestamp format.
277         for o in twlog.theLogPublisher.observers:
278             # o might be a FileLogObserver's .emit method
279             if type(o) is type(self.setup_logging): # bound method
280                 ob = o.im_self
281                 if isinstance(ob, twlog.FileLogObserver):
282                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
283                     ob.formatTime = newmeth
284         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
285
286         lgfurl_file = os.path.join(self.basedir, "private", "logport.furl").encode(get_filesystem_encoding())
287         self.tub.setOption("logport-furlfile", lgfurl_file)
288         lgfurl = self.get_config("node", "log_gatherer.furl", "")
289         if lgfurl:
290             # this is in addition to the contents of log-gatherer-furlfile
291             self.tub.setOption("log-gatherer-furl", lgfurl)
292         self.tub.setOption("log-gatherer-furlfile",
293                            os.path.join(self.basedir, "log_gatherer.furl"))
294         self.tub.setOption("bridge-twisted-logs", True)
295         incident_dir = os.path.join(self.basedir, "logs", "incidents")
296         # this doesn't quite work yet: unit tests fail
297         foolscap.logging.log.setLogDir(incident_dir)
298
299     def log(self, *args, **kwargs):
300         return log.msg(*args, **kwargs)
301
302     def _setup_tub(self, local_addresses):
303         # we can't get a dynamically-assigned portnum until our Tub is
304         # running, which means after startService.
305         l = self.tub.getListeners()[0]
306         portnum = l.getPortnum()
307         # record which port we're listening on, so we can grab the same one
308         # next time
309         open(self._portnumfile, "w").write("%d\n" % portnum)
310
311         base_location = ",".join([ "%s:%d" % (addr, portnum)
312                                    for addr in local_addresses ])
313         location = self.get_config("node", "tub.location", base_location)
314         self.log("Tub location set to %s" % location)
315         self.tub.setLocation(location)
316
317         return self.tub
318
319     def when_tub_ready(self):
320         return self._tub_ready_observerlist.when_fired()
321
322     def add_service(self, s):
323         s.setServiceParent(self)
324         return s