]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
macapp: simplify node startup failure reporting
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 from allmydata import get_package_versions_string
10 from allmydata.util import log as tahoe_log
11 from allmydata.util import fileutil, iputil, observer, humanreadable
12 from allmydata.util.assertutil import precondition
13
14 # Just to get their versions:
15 import allmydata, pycryptopp, zfec
16
17 from foolscap.logging.publish import LogPublisher
18 # Add our application versions to the data that Foolscap's LogPublisher
19 # reports. Our __version__ attributes are actually instances of a "Version"
20 # class, so convert them into strings first.
21 LogPublisher.versions['allmydata'] = str(allmydata.__version__)
22 LogPublisher.versions['zfec'] = str(zfec.__version__)
23 LogPublisher.versions['pycryptopp'] = str(pycryptopp.__version__)
24
25 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
26 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
27
28
29 def formatTimeTahoeStyle(self, when):
30     # we want UTC timestamps that look like:
31     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
32     d = datetime.datetime.utcfromtimestamp(when)
33     if d.microsecond:
34         return d.isoformat(" ")[:-3]+"Z"
35     else:
36         return d.isoformat(" ") + ".000Z"
37
38 PRIV_README="""
39 This directory contains files which contain private data for the Tahoe node,
40 such as private keys.  On Unix-like systems, the permissions on this directory
41 are set to disallow users other than its owner from reading the contents of
42 the files.   See the 'configuration.txt' documentation file for details."""
43
44 class Node(service.MultiService):
45     # this implements common functionality of both Client nodes and Introducer
46     # nodes.
47     NODETYPE = "unknown NODETYPE"
48     PORTNUMFILE = None
49     CERTFILE = "node.pem"
50     LOCAL_IP_FILE = "advertised_ip_addresses"
51
52     def __init__(self, basedir="."):
53         service.MultiService.__init__(self)
54         self.basedir = os.path.abspath(basedir)
55         self._tub_ready_observerlist = observer.OneShotObserverList()
56         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
57         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
58         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
59         self.tub = Tub(certFile=certfile)
60         self.tub.setOption("logLocalFailures", True)
61         self.tub.setOption("logRemoteFailures", True)
62         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
63         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
64         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
65         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
66         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
67         try:
68             portnum = int(open(self._portnumfile, "rU").read())
69         except (EnvironmentError, ValueError):
70             portnum = 0
71         self.tub.listenOn("tcp:%d" % portnum)
72         # we must wait until our service has started before we can find out
73         # our IP address and thus do tub.setLocation, and we can't register
74         # any services with the Tub until after that point
75         self.tub.setServiceParent(self)
76         self.logSource="Node"
77
78         AUTHKEYSFILEBASE = "authorized_keys."
79         for f in os.listdir(self.basedir):
80             if f.startswith(AUTHKEYSFILEBASE):
81                 keyfile = os.path.join(self.basedir, f)
82                 portnum = int(f[len(AUTHKEYSFILEBASE):])
83                 from allmydata import manhole
84                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
85                 m.setServiceParent(self)
86                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
87
88         self.setup_logging()
89         self.log("Node constructed. " + get_package_versions_string())
90         iputil.increase_rlimits()
91
92     def get_config(self, name, required=False):
93         """Get the (string) contents of a config file, or None if the file
94         did not exist. If required=True, raise an exception rather than
95         returning None. Any leading or trailing whitespace will be stripped
96         from the data."""
97         fn = os.path.join(self.basedir, name)
98         try:
99             return open(fn, "r").read().strip()
100         except EnvironmentError:
101             if not required:
102                 return None
103             raise
104
105     def write_private_config(self, name, value):
106         """Write the (string) contents of a private config file (which is a
107         config file that resides within the subdirectory named 'private'), and
108         return it. Any leading or trailing whitespace will be stripped from
109         the data.
110         """
111         privname = os.path.join(self.basedir, "private", name)
112         open(privname, "w").write(value.strip())
113
114     def get_or_create_private_config(self, name, default):
115         """Try to get the (string) contents of a private config file (which
116         is a config file that resides within the subdirectory named
117         'private'), and return it. Any leading or trailing whitespace will be
118         stripped from the data.
119
120         If the file does not exist, try to create it using default, and
121         then return the value that was written. If 'default' is a string,
122         use it as a default value. If not, treat it as a 0-argument callable
123         which is expected to return a string.
124         """
125         privname = os.path.join("private", name)
126         value = self.get_config(privname)
127         if value is None:
128             if isinstance(default, (str, unicode)):
129                 value = default
130             else:
131                 value = default()
132             fn = os.path.join(self.basedir, privname)
133             try:
134                 open(fn, "w").write(value)
135             except EnvironmentError, e:
136                 self.log("Unable to write config file '%s'" % fn)
137                 self.log(e)
138             value = value.strip()
139         return value
140
141     def write_config(self, name, value, mode="w"):
142         """Write a string to a config file."""
143         fn = os.path.join(self.basedir, name)
144         try:
145             open(fn, mode).write(value)
146         except EnvironmentError, e:
147             self.log("Unable to write config file '%s'" % fn)
148             self.log(e)
149
150     def startService(self):
151         # Note: this class can be started and stopped at most once.
152         self.log("Node.startService")
153         # Delay until the reactor is running.
154         eventual.eventually(self._startService)
155
156     def _startService(self):
157         precondition(reactor.running)
158         self.log("Node._startService")
159
160         service.MultiService.startService(self)
161         d = defer.succeed(None)
162         d.addCallback(lambda res: iputil.get_local_addresses_async())
163         d.addCallback(self._setup_tub)
164         def _ready(res):
165             self.log("%s running" % self.NODETYPE)
166             self._tub_ready_observerlist.fire(self)
167             return self
168         d.addCallback(_ready)
169         d.addErrback(self._service_startup_failed)
170
171     def _service_startup_failed(self, failure):
172         self.log('_startService() failed')
173         log.err(failure)
174         print "Node._startService failed, aborting"
175         print failure
176         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
177         self.log('calling os.abort()')
178         log.msg('calling os.abort()')
179         print "calling os.abort()"
180         os.abort()
181
182     def stopService(self):
183         self.log("Node.stopService")
184         d = self._tub_ready_observerlist.when_fired()
185         def _really_stopService(ignored):
186             self.log("Node._really_stopService")
187             return service.MultiService.stopService(self)
188         d.addCallback(_really_stopService)
189         return d
190
191     def shutdown(self):
192         """Shut down the node. Returns a Deferred that fires (with None) when
193         it finally stops kicking."""
194         self.log("Node.shutdown")
195         return self.stopService()
196
197     def setup_logging(self):
198         # we replace the formatTime() method of the log observer that twistd
199         # set up for us, with a method that uses better timestamps.
200         for o in log.theLogPublisher.observers:
201             # o might be a FileLogObserver's .emit method
202             if type(o) is type(self.setup_logging): # bound method
203                 ob = o.im_self
204                 if isinstance(ob, log.FileLogObserver):
205                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
206                     ob.formatTime = newmeth
207         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
208
209         self.tub.setOption("logport-furlfile",
210                            os.path.join(self.basedir, "private","logport.furl"))
211         self.tub.setOption("log-gatherer-furlfile",
212                            os.path.join(self.basedir, "log_gatherer.furl"))
213         self.tub.setOption("bridge-twisted-logs", True)
214
215     def log(self, *args, **kwargs):
216         return tahoe_log.msg(*args, **kwargs)
217
218     def old_log(self, msg, src="", args=(), **kw):
219         if src:
220             logsrc = src
221         else:
222             logsrc = self.logSource
223         if args:
224             try:
225                 msg = msg % tuple(map(humanreadable.hr, args))
226             except TypeError, e:
227                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
228         msg = self.short_nodeid + ": " + humanreadable.hr(msg)
229         return log.callWithContext({"system":logsrc},
230                                    tahoe_log.msg, msg, **kw)
231
232     def _setup_tub(self, local_addresses):
233         # we can't get a dynamically-assigned portnum until our Tub is
234         # running, which means after startService.
235         l = self.tub.getListeners()[0]
236         portnum = l.getPortnum()
237         # record which port we're listening on, so we can grab the same one next time
238         open(self._portnumfile, "w").write("%d\n" % portnum)
239
240         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
241
242         addresses = []
243         try:
244             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
245                 mo = ADDR_RE.search(addrline)
246                 if mo:
247                     (addr, dummy, aportnum,) = mo.groups()
248                     if aportnum is None:
249                         aportnum = portnum
250                     addresses.append("%s:%d" % (addr, int(aportnum),))
251         except EnvironmentError:
252             pass
253
254         addresses.extend(local_addresses)
255
256         location = ",".join(addresses)
257         self.log("Tub location set to %s" % location)
258         self.tub.setLocation(location)
259         return self.tub
260
261     def when_tub_ready(self):
262         return self._tub_ready_observerlist.when_fired()
263
264     def add_service(self, s):
265         s.setServiceParent(self)
266         return s
267