]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
Fix the bug that prevents an introducer from starting when introducer.furl already...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1 import datetime, os.path, re, types, ConfigParser, tempfile
2 from base64 import b32decode, b32encode
3
4 from twisted.python import log as twlog
5 from twisted.application import service
6 from twisted.internet import defer, reactor
7 from foolscap.api import Tub, eventually, app_versions
8 import foolscap.logging.log
9 from allmydata import get_package_versions, get_package_versions_string
10 from allmydata.util import log
11 from allmydata.util import fileutil, iputil, observer
12 from allmydata.util.assertutil import precondition, _assert
13 from allmydata.util.fileutil import abspath_expanduser_unicode
14 from allmydata.util.encodingutil import get_filesystem_encoding
15
16 # Add our application versions to the data that Foolscap's LogPublisher
17 # reports.
18 for thing, things_version in get_package_versions().iteritems():
19     app_versions.add_version(thing, str(things_version))
20
21 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
22 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
23
24
25 def formatTimeTahoeStyle(self, when):
26     # we want UTC timestamps that look like:
27     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
28     d = datetime.datetime.utcfromtimestamp(when)
29     if d.microsecond:
30         return d.isoformat(" ")[:-3]+"Z"
31     else:
32         return d.isoformat(" ") + ".000Z"
33
34 PRIV_README="""
35 This directory contains files which contain private data for the Tahoe node,
36 such as private keys.  On Unix-like systems, the permissions on this directory
37 are set to disallow users other than its owner from reading the contents of
38 the files.   See the 'configuration.rst' documentation file for details."""
39
40 class _None: # used as a marker in get_config()
41     pass
42
43 class MissingConfigEntry(Exception):
44     """ A required config entry was not found. """
45
46 class OldConfigError(Exception):
47     """ An obsolete config file was found. See
48     docs/historical/configuration.rst. """
49
50 class Node(service.MultiService):
51     # this implements common functionality of both Client nodes and Introducer
52     # nodes.
53     NODETYPE = "unknown NODETYPE"
54     PORTNUMFILE = None
55     CERTFILE = "node.pem"
56     GENERATED_FILES = []
57
58     def __init__(self, basedir=u"."):
59         service.MultiService.__init__(self)
60         self.basedir = abspath_expanduser_unicode(unicode(basedir))
61         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
62         self._tub_ready_observerlist = observer.OneShotObserverList()
63         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
64         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
65
66         # creates self.config
67         self.read_config()
68         nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
69         self.nickname = nickname_utf8.decode("utf-8")
70         assert type(self.nickname) is unicode
71
72         self.init_tempdir()
73         self.create_tub()
74         self.logSource="Node"
75
76         self.setup_ssh()
77         self.setup_logging()
78         self.log("Node constructed. " + get_package_versions_string())
79         iputil.increase_rlimits()
80
81     def init_tempdir(self):
82         local_tempdir_utf8 = "tmp" # default is NODEDIR/tmp/
83         tempdir = self.get_config("node", "tempdir", local_tempdir_utf8).decode('utf-8')
84         tempdir = os.path.join(self.basedir, tempdir)
85         if not os.path.exists(tempdir):
86             fileutil.make_dirs(tempdir)
87         tempfile.tempdir = abspath_expanduser_unicode(tempdir)
88         # this should cause twisted.web.http (which uses
89         # tempfile.TemporaryFile) to put large request bodies in the given
90         # directory. Without this, the default temp dir is usually /tmp/,
91         # which is frequently too small.
92         test_name = tempfile.mktemp()
93         _assert(os.path.dirname(test_name) == tempdir, test_name, tempdir)
94
95     def get_config(self, section, option, default=_None, boolean=False):
96         try:
97             if boolean:
98                 return self.config.getboolean(section, option)
99             return self.config.get(section, option)
100         except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
101             if default is _None:
102                 fn = os.path.join(self.basedir, "tahoe.cfg")
103                 raise MissingConfigEntry("%s is missing the [%s]%s entry"
104                                          % (fn, section, option))
105             return default
106
107     def set_config(self, section, option, value):
108         if not self.config.has_section(section):
109             self.config.add_section(section)
110         self.config.set(section, option, value)
111         assert self.config.get(section, option) == value
112
113     def read_config(self):
114         self.error_about_old_config_files()
115         self.config = ConfigParser.SafeConfigParser()
116         self.config.read([os.path.join(self.basedir, "tahoe.cfg")])
117
118     def error_about_old_config_files(self):
119         """ If any old configuration files are detected, raise OldConfigError. """
120
121         oldfnames = set()
122         for name in [
123             'nickname', 'webport', 'keepalive_timeout', 'log_gatherer.furl',
124             'disconnect_timeout', 'advertised_ip_addresses', 'introducer.furl',
125             'helper.furl', 'key_generator.furl', 'stats_gatherer.furl',
126             'no_storage', 'readonly_storage', 'sizelimit',
127             'debug_discard_storage', 'run_helper']:
128             if name not in self.GENERATED_FILES:
129                 fullfname = os.path.join(self.basedir, name)
130                 if os.path.exists(fullfname):
131                     log.err("Found pre-Tahoe-LAFS-v1.3 configuration file: '%s'. See docs/historical/configuration.rst." % (fullfname,))
132                     oldfnames.add(fullfname)
133         if oldfnames:
134             raise OldConfigError(oldfnames)
135
136     def create_tub(self):
137         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
138         self.tub = Tub(certFile=certfile)
139         self.tub.setOption("logLocalFailures", True)
140         self.tub.setOption("logRemoteFailures", True)
141         self.tub.setOption("expose-remote-exception-types", False)
142
143         # see #521 for a discussion of how to pick these timeout values.
144         keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
145         if keepalive_timeout_s:
146             self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
147         disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
148         if disconnect_timeout_s:
149             # N.B.: this is in seconds, so use "1800" to get 30min
150             self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
151
152         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
153         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
154         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
155
156         tubport = self.get_config("node", "tub.port", "tcp:0")
157         self.tub.listenOn(tubport)
158         # we must wait until our service has started before we can find out
159         # our IP address and thus do tub.setLocation, and we can't register
160         # any services with the Tub until after that point
161         self.tub.setServiceParent(self)
162
163     def setup_ssh(self):
164         ssh_port = self.get_config("node", "ssh.port", "")
165         if ssh_port:
166             ssh_keyfile = self.get_config("node", "ssh.authorized_keys_file").decode('utf-8')
167             from allmydata import manhole
168             m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile.encode(get_filesystem_encoding()))
169             m.setServiceParent(self)
170             self.log("AuthorizedKeysManhole listening on %s" % ssh_port)
171
172     def get_app_versions(self):
173         # TODO: merge this with allmydata.get_package_versions
174         return dict(app_versions.versions)
175
176     def write_private_config(self, name, value):
177         """Write the (string) contents of a private config file (which is a
178         config file that resides within the subdirectory named 'private'), and
179         return it. Any leading or trailing whitespace will be stripped from
180         the data.
181         """
182         privname = os.path.join(self.basedir, "private", name)
183         open(privname, "w").write(value.strip())
184
185     def get_or_create_private_config(self, name, default):
186         """Try to get the (string) contents of a private config file (which
187         is a config file that resides within the subdirectory named
188         'private'), and return it. Any leading or trailing whitespace will be
189         stripped from the data.
190
191         If the file does not exist, try to create it using default, and
192         then return the value that was written. If 'default' is a string,
193         use it as a default value. If not, treat it as a 0-argument callable
194         which is expected to return a string.
195         """
196         privname = os.path.join(self.basedir, "private", name)
197         try:
198             value = fileutil.read(privname)
199         except EnvironmentError:
200             if isinstance(default, basestring):
201                 value = default
202             else:
203                 value = default()
204             fileutil.write(privname, value)
205         return value.strip()
206
207     def write_config(self, name, value, mode="w"):
208         """Write a string to a config file."""
209         fn = os.path.join(self.basedir, name)
210         try:
211             open(fn, mode).write(value)
212         except EnvironmentError, e:
213             self.log("Unable to write config file '%s'" % fn)
214             self.log(e)
215
216     def startService(self):
217         # Note: this class can be started and stopped at most once.
218         self.log("Node.startService")
219         # Record the process id in the twisted log, after startService()
220         # (__init__ is called before fork(), but startService is called
221         # after). Note that Foolscap logs handle pid-logging by itself, no
222         # need to send a pid to the foolscap log here.
223         twlog.msg("My pid: %s" % os.getpid())
224         try:
225             os.chmod("twistd.pid", 0644)
226         except EnvironmentError:
227             pass
228         # Delay until the reactor is running.
229         eventually(self._startService)
230
231     def _startService(self):
232         precondition(reactor.running)
233         self.log("Node._startService")
234
235         service.MultiService.startService(self)
236         d = defer.succeed(None)
237         d.addCallback(lambda res: iputil.get_local_addresses_async())
238         d.addCallback(self._setup_tub)
239         def _ready(res):
240             self.log("%s running" % self.NODETYPE)
241             self._tub_ready_observerlist.fire(self)
242             return self
243         d.addCallback(_ready)
244         d.addErrback(self._service_startup_failed)
245
246     def _service_startup_failed(self, failure):
247         self.log('_startService() failed')
248         log.err(failure)
249         print "Node._startService failed, aborting"
250         print failure
251         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
252         self.log('calling os.abort()')
253         twlog.msg('calling os.abort()') # make sure it gets into twistd.log
254         print "calling os.abort()"
255         os.abort()
256
257     def stopService(self):
258         self.log("Node.stopService")
259         d = self._tub_ready_observerlist.when_fired()
260         def _really_stopService(ignored):
261             self.log("Node._really_stopService")
262             return service.MultiService.stopService(self)
263         d.addCallback(_really_stopService)
264         return d
265
266     def shutdown(self):
267         """Shut down the node. Returns a Deferred that fires (with None) when
268         it finally stops kicking."""
269         self.log("Node.shutdown")
270         return self.stopService()
271
272     def setup_logging(self):
273         # we replace the formatTime() method of the log observer that
274         # twistd set up for us, with a method that uses our preferred
275         # timestamp format.
276         for o in twlog.theLogPublisher.observers:
277             # o might be a FileLogObserver's .emit method
278             if type(o) is type(self.setup_logging): # bound method
279                 ob = o.im_self
280                 if isinstance(ob, twlog.FileLogObserver):
281                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
282                     ob.formatTime = newmeth
283         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
284
285         lgfurl_file = os.path.join(self.basedir, "private", "logport.furl").encode(get_filesystem_encoding())
286         self.tub.setOption("logport-furlfile", lgfurl_file)
287         lgfurl = self.get_config("node", "log_gatherer.furl", "")
288         if lgfurl:
289             # this is in addition to the contents of log-gatherer-furlfile
290             self.tub.setOption("log-gatherer-furl", lgfurl)
291         self.tub.setOption("log-gatherer-furlfile",
292                            os.path.join(self.basedir, "log_gatherer.furl"))
293         self.tub.setOption("bridge-twisted-logs", True)
294         incident_dir = os.path.join(self.basedir, "logs", "incidents")
295         # this doesn't quite work yet: unit tests fail
296         foolscap.logging.log.setLogDir(incident_dir)
297
298     def log(self, *args, **kwargs):
299         return log.msg(*args, **kwargs)
300
301     def _setup_tub(self, local_addresses):
302         # we can't get a dynamically-assigned portnum until our Tub is
303         # running, which means after startService.
304         l = self.tub.getListeners()[0]
305         portnum = l.getPortnum()
306         # record which port we're listening on, so we can grab the same one
307         # next time
308         open(self._portnumfile, "w").write("%d\n" % portnum)
309
310         base_location = ",".join([ "%s:%d" % (addr, portnum)
311                                    for addr in local_addresses ])
312         location = self.get_config("node", "tub.location", base_location)
313         self.log("Tub location set to %s" % location)
314         self.tub.setLocation(location)
315
316         return self.tub
317
318     def when_tub_ready(self):
319         return self._tub_ready_observerlist.when_fired()
320
321     def add_service(self, s):
322         s.setServiceParent(self)
323         return s