]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
switch to using RemoteException instead of 'wrapped' RemoteReferences. Should fix...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types, ConfigParser, tempfile
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log as twlog
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap.api import Tub, eventually, app_versions
9 import foolscap.logging.log
10 from allmydata import get_package_versions, get_package_versions_string
11 from allmydata.util import log
12 from allmydata.util import fileutil, iputil, observer
13 from allmydata.util.assertutil import precondition, _assert
14
15 # Add our application versions to the data that Foolscap's LogPublisher
16 # reports.
17 for thing, things_version in get_package_versions().iteritems():
18     app_versions.add_version(thing, str(things_version))
19
20 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
21 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
22
23
24 def formatTimeTahoeStyle(self, when):
25     # we want UTC timestamps that look like:
26     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
27     d = datetime.datetime.utcfromtimestamp(when)
28     if d.microsecond:
29         return d.isoformat(" ")[:-3]+"Z"
30     else:
31         return d.isoformat(" ") + ".000Z"
32
33 PRIV_README="""
34 This directory contains files which contain private data for the Tahoe node,
35 such as private keys.  On Unix-like systems, the permissions on this directory
36 are set to disallow users other than its owner from reading the contents of
37 the files.   See the 'configuration.txt' documentation file for details."""
38
39 class _None: # used as a marker in get_config()
40     pass
41
42 class MissingConfigEntry(Exception):
43     pass
44
45 class Node(service.MultiService):
46     # this implements common functionality of both Client nodes and Introducer
47     # nodes.
48     NODETYPE = "unknown NODETYPE"
49     PORTNUMFILE = None
50     CERTFILE = "node.pem"
51
52     def __init__(self, basedir="."):
53         service.MultiService.__init__(self)
54         self.basedir = os.path.abspath(basedir)
55         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
56         self._tub_ready_observerlist = observer.OneShotObserverList()
57         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
58         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
59
60         # creates self.config, populates from distinct files if necessary
61         self.read_config()
62
63         nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
64         self.nickname = nickname_utf8.decode("utf-8")
65
66         self.init_tempdir()
67         self.create_tub()
68         self.logSource="Node"
69
70         self.setup_ssh()
71         self.setup_logging()
72         self.log("Node constructed. " + get_package_versions_string())
73         iputil.increase_rlimits()
74
75     def init_tempdir(self):
76         local_tempdir = "tmp" # default is NODEDIR/tmp/
77         tempdir = self.get_config("node", "tempdir", local_tempdir)
78         tempdir = os.path.join(self.basedir, tempdir)
79         if not os.path.exists(tempdir):
80             fileutil.make_dirs(tempdir)
81         tempfile.tempdir = os.path.abspath(tempdir)
82         # this should cause twisted.web.http (which uses
83         # tempfile.TemporaryFile) to put large request bodies in the given
84         # directory. Without this, the default temp dir is usually /tmp/,
85         # which is frequently too small.
86         test_name = tempfile.mktemp()
87         _assert(os.path.dirname(test_name) == tempdir, test_name, tempdir)
88
89     def get_config(self, section, option, default=_None, boolean=False):
90         try:
91             if boolean:
92                 return self.config.getboolean(section, option)
93             return self.config.get(section, option)
94         except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
95             if default is _None:
96                 fn = os.path.join(self.basedir, "tahoe.cfg")
97                 raise MissingConfigEntry("%s is missing the [%s]%s entry"
98                                          % (fn, section, option))
99             return default
100
101     def set_config(self, section, option, value):
102         if not self.config.has_section(section):
103             self.config.add_section(section)
104         self.config.set(section, option, value)
105         assert self.config.get(section, option) == value
106
107     def read_config(self):
108         self.config = ConfigParser.SafeConfigParser()
109         self.config.read([os.path.join(self.basedir, "tahoe.cfg")])
110         self.read_old_config_files()
111
112     def read_old_config_files(self):
113         # backwards-compatibility: individual files will override the
114         # contents of tahoe.cfg
115         copy = self._copy_config_from_file
116
117         copy("nickname", "node", "nickname")
118         copy("webport", "node", "web.port")
119
120         cfg_tubport = self.get_config("node", "tub.port", "")
121         if not cfg_tubport:
122             # For 'tub.port', tahoe.cfg overrides the individual file on
123             # disk. So only read self._portnumfile is tahoe.cfg doesn't
124             # provide a value.
125             try:
126                 file_tubport = open(self._portnumfile, "rU").read().strip()
127                 self.set_config("node", "tub.port", file_tubport)
128             except EnvironmentError:
129                 pass
130
131         copy("keepalive_timeout", "node", "timeout.keepalive")
132         copy("disconnect_timeout", "node", "timeout.disconnect")
133
134     def _copy_config_from_file(self, config_filename, section, keyname):
135         s = self.get_config_from_file(config_filename)
136         if s is not None:
137             self.set_config(section, keyname, s)
138
139     def create_tub(self):
140         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
141         self.tub = Tub(certFile=certfile)
142         self.tub.setOption("logLocalFailures", True)
143         self.tub.setOption("logRemoteFailures", True)
144         self.tub.setOption("expose-remote-exception-types", False)
145
146         # see #521 for a discussion of how to pick these timeout values.
147         keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
148         if keepalive_timeout_s:
149             self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
150         disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
151         if disconnect_timeout_s:
152             # N.B.: this is in seconds, so use "1800" to get 30min
153             self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
154
155         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
156         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
157         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
158
159         tubport = self.get_config("node", "tub.port", "tcp:0")
160         self.tub.listenOn(tubport)
161         # we must wait until our service has started before we can find out
162         # our IP address and thus do tub.setLocation, and we can't register
163         # any services with the Tub until after that point
164         self.tub.setServiceParent(self)
165
166     def setup_ssh(self):
167         ssh_port = self.get_config("node", "ssh.port", "")
168         if ssh_port:
169             ssh_keyfile = self.get_config("node", "ssh.authorized_keys_file")
170             from allmydata import manhole
171             m = manhole.AuthorizedKeysManhole(ssh_port, ssh_keyfile)
172             m.setServiceParent(self)
173             self.log("AuthorizedKeysManhole listening on %s" % ssh_port)
174
175     def get_app_versions(self):
176         # TODO: merge this with allmydata.get_package_versions
177         return dict(app_versions.versions)
178
179     def get_config_from_file(self, name, required=False):
180         """Get the (string) contents of a config file, or None if the file
181         did not exist. If required=True, raise an exception rather than
182         returning None. Any leading or trailing whitespace will be stripped
183         from the data."""
184         fn = os.path.join(self.basedir, name)
185         try:
186             return open(fn, "r").read().strip()
187         except EnvironmentError:
188             if not required:
189                 return None
190             raise
191
192     def write_private_config(self, name, value):
193         """Write the (string) contents of a private config file (which is a
194         config file that resides within the subdirectory named 'private'), and
195         return it. Any leading or trailing whitespace will be stripped from
196         the data.
197         """
198         privname = os.path.join(self.basedir, "private", name)
199         open(privname, "w").write(value.strip())
200
201     def get_or_create_private_config(self, name, default):
202         """Try to get the (string) contents of a private config file (which
203         is a config file that resides within the subdirectory named
204         'private'), and return it. Any leading or trailing whitespace will be
205         stripped from the data.
206
207         If the file does not exist, try to create it using default, and
208         then return the value that was written. If 'default' is a string,
209         use it as a default value. If not, treat it as a 0-argument callable
210         which is expected to return a string.
211         """
212         privname = os.path.join("private", name)
213         value = self.get_config_from_file(privname)
214         if value is None:
215             if isinstance(default, (str, unicode)):
216                 value = default
217             else:
218                 value = default()
219             fn = os.path.join(self.basedir, privname)
220             try:
221                 open(fn, "w").write(value)
222             except EnvironmentError, e:
223                 self.log("Unable to write config file '%s'" % fn)
224                 self.log(e)
225             value = value.strip()
226         return value
227
228     def write_config(self, name, value, mode="w"):
229         """Write a string to a config file."""
230         fn = os.path.join(self.basedir, name)
231         try:
232             open(fn, mode).write(value)
233         except EnvironmentError, e:
234             self.log("Unable to write config file '%s'" % fn)
235             self.log(e)
236
237     def startService(self):
238         # Note: this class can be started and stopped at most once.
239         self.log("Node.startService")
240         try:
241             os.chmod("twistd.pid", 0644)
242         except EnvironmentError:
243             pass
244         # Delay until the reactor is running.
245         eventually(self._startService)
246
247     def _startService(self):
248         precondition(reactor.running)
249         self.log("Node._startService")
250
251         service.MultiService.startService(self)
252         d = defer.succeed(None)
253         d.addCallback(lambda res: iputil.get_local_addresses_async())
254         d.addCallback(self._setup_tub)
255         def _ready(res):
256             self.log("%s running" % self.NODETYPE)
257             self._tub_ready_observerlist.fire(self)
258             return self
259         d.addCallback(_ready)
260         d.addErrback(self._service_startup_failed)
261
262     def _service_startup_failed(self, failure):
263         self.log('_startService() failed')
264         log.err(failure)
265         print "Node._startService failed, aborting"
266         print failure
267         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
268         self.log('calling os.abort()')
269         twlog.msg('calling os.abort()') # make sure it gets into twistd.log
270         print "calling os.abort()"
271         os.abort()
272
273     def stopService(self):
274         self.log("Node.stopService")
275         d = self._tub_ready_observerlist.when_fired()
276         def _really_stopService(ignored):
277             self.log("Node._really_stopService")
278             return service.MultiService.stopService(self)
279         d.addCallback(_really_stopService)
280         return d
281
282     def shutdown(self):
283         """Shut down the node. Returns a Deferred that fires (with None) when
284         it finally stops kicking."""
285         self.log("Node.shutdown")
286         return self.stopService()
287
288     def setup_logging(self):
289         # we replace the formatTime() method of the log observer that twistd
290         # set up for us, with a method that uses better timestamps.
291         for o in twlog.theLogPublisher.observers:
292             # o might be a FileLogObserver's .emit method
293             if type(o) is type(self.setup_logging): # bound method
294                 ob = o.im_self
295                 if isinstance(ob, twlog.FileLogObserver):
296                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
297                     ob.formatTime = newmeth
298         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
299
300         self.tub.setOption("logport-furlfile",
301                            os.path.join(self.basedir, "private","logport.furl"))
302         lgfurl = self.get_config("node", "log_gatherer.furl", "")
303         if lgfurl:
304             # this is in addition to the contents of log-gatherer-furlfile
305             self.tub.setOption("log-gatherer-furl", lgfurl)
306         self.tub.setOption("log-gatherer-furlfile",
307                            os.path.join(self.basedir, "log_gatherer.furl"))
308         self.tub.setOption("bridge-twisted-logs", True)
309         incident_dir = os.path.join(self.basedir, "logs", "incidents")
310         # this doesn't quite work yet: unit tests fail
311         foolscap.logging.log.setLogDir(incident_dir)
312
313     def log(self, *args, **kwargs):
314         return log.msg(*args, **kwargs)
315
316     def _setup_tub(self, local_addresses):
317         # we can't get a dynamically-assigned portnum until our Tub is
318         # running, which means after startService.
319         l = self.tub.getListeners()[0]
320         portnum = l.getPortnum()
321         # record which port we're listening on, so we can grab the same one
322         # next time
323         open(self._portnumfile, "w").write("%d\n" % portnum)
324
325         base_location = ",".join([ "%s:%d" % (addr, portnum)
326                                    for addr in local_addresses ])
327         location = self.get_config("node", "tub.location", base_location)
328         self.log("Tub location set to %s" % location)
329         self.tub.setLocation(location)
330
331         return self.tub
332
333     def when_tub_ready(self):
334         return self._tub_ready_observerlist.when_fired()
335
336     def add_service(self, s):
337         s.setServiceParent(self)
338         return s
339