]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
macapp: report failure of node startup to the user
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 from allmydata import get_package_versions_string
10 from allmydata.util import log as tahoe_log
11 from allmydata.util import fileutil, iputil, observer, humanreadable
12 from allmydata.util.assertutil import precondition
13
14 # Just to get their versions:
15 import allmydata, pycryptopp, zfec
16
17 from foolscap.logging.publish import LogPublisher
18 # Add our application versions to the data that Foolscap's LogPublisher
19 # reports. Our __version__ attributes are actually instances of a "Version"
20 # class, so convert them into strings first.
21 LogPublisher.versions['allmydata'] = str(allmydata.__version__)
22 LogPublisher.versions['zfec'] = str(zfec.__version__)
23 LogPublisher.versions['pycryptopp'] = str(pycryptopp.__version__)
24
25 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
26 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
27
28
29 def formatTimeTahoeStyle(self, when):
30     # we want UTC timestamps that look like:
31     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
32     d = datetime.datetime.utcfromtimestamp(when)
33     if d.microsecond:
34         return d.isoformat(" ")[:-3]+"Z"
35     else:
36         return d.isoformat(" ") + ".000Z"
37
38 PRIV_README="""
39 This directory contains files which contain private data for the Tahoe node,
40 such as private keys.  On Unix-like systems, the permissions on this directory
41 are set to disallow users other than its owner from reading the contents of
42 the files.   See the 'configuration.txt' documentation file for details."""
43
44 class Node(service.MultiService):
45     # this implements common functionality of both Client nodes and Introducer
46     # nodes.
47     NODETYPE = "unknown NODETYPE"
48     PORTNUMFILE = None
49     CERTFILE = "node.pem"
50     LOCAL_IP_FILE = "advertised_ip_addresses"
51
52     def __init__(self, basedir="."):
53         service.MultiService.__init__(self)
54         self.basedir = os.path.abspath(basedir)
55         self._tub_ready_observerlist = observer.OneShotObserverList()
56         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
57         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
58         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
59         self.tub = Tub(certFile=certfile)
60         self.tub.setOption("logLocalFailures", True)
61         self.tub.setOption("logRemoteFailures", True)
62         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
63         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
64         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
65         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
66         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
67         try:
68             portnum = int(open(self._portnumfile, "rU").read())
69         except (EnvironmentError, ValueError):
70             portnum = 0
71         self.tub.listenOn("tcp:%d" % portnum)
72         # we must wait until our service has started before we can find out
73         # our IP address and thus do tub.setLocation, and we can't register
74         # any services with the Tub until after that point
75         self.tub.setServiceParent(self)
76         self.logSource="Node"
77
78         AUTHKEYSFILEBASE = "authorized_keys."
79         for f in os.listdir(self.basedir):
80             if f.startswith(AUTHKEYSFILEBASE):
81                 keyfile = os.path.join(self.basedir, f)
82                 portnum = int(f[len(AUTHKEYSFILEBASE):])
83                 from allmydata import manhole
84                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
85                 m.setServiceParent(self)
86                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
87
88         self.setup_logging()
89         self.log("Node constructed. " + get_package_versions_string())
90         iputil.increase_rlimits()
91
92     def get_config(self, name, required=False):
93         """Get the (string) contents of a config file, or None if the file
94         did not exist. If required=True, raise an exception rather than
95         returning None. Any leading or trailing whitespace will be stripped
96         from the data."""
97         fn = os.path.join(self.basedir, name)
98         try:
99             return open(fn, "r").read().strip()
100         except EnvironmentError:
101             if not required:
102                 return None
103             raise
104
105     def write_private_config(self, name, value):
106         """Write the (string) contents of a private config file (which is a
107         config file that resides within the subdirectory named 'private'), and
108         return it. Any leading or trailing whitespace will be stripped from
109         the data.
110         """
111         privname = os.path.join(self.basedir, "private", name)
112         open(privname, "w").write(value.strip())
113
114     def get_or_create_private_config(self, name, default):
115         """Try to get the (string) contents of a private config file (which
116         is a config file that resides within the subdirectory named
117         'private'), and return it. Any leading or trailing whitespace will be
118         stripped from the data.
119
120         If the file does not exist, try to create it using default, and
121         then return the value that was written. If 'default' is a string,
122         use it as a default value. If not, treat it as a 0-argument callable
123         which is expected to return a string.
124         """
125         privname = os.path.join("private", name)
126         value = self.get_config(privname)
127         if value is None:
128             if isinstance(default, (str, unicode)):
129                 value = default
130             else:
131                 value = default()
132             fn = os.path.join(self.basedir, privname)
133             try:
134                 open(fn, "w").write(value)
135             except EnvironmentError, e:
136                 self.log("Unable to write config file '%s'" % fn)
137                 self.log(e)
138             value = value.strip()
139         return value
140
141     def write_config(self, name, value, mode="w"):
142         """Write a string to a config file."""
143         fn = os.path.join(self.basedir, name)
144         try:
145             open(fn, mode).write(value)
146         except EnvironmentError, e:
147             self.log("Unable to write config file '%s'" % fn)
148             self.log(e)
149
150     def startService(self):
151         # Note: this class can be started and stopped at most once.
152         self.log("Node.startService")
153         # Delay until the reactor is running.
154         eventual.eventually(self._startService)
155
156     def _startService(self):
157         precondition(reactor.running)
158         self.log("Node._startService")
159
160         service.MultiService.startService(self)
161         d = defer.succeed(None)
162         d.addCallback(lambda res: iputil.get_local_addresses_async())
163         d.addCallback(self._setup_tub)
164         def _ready(res):
165             self.log("%s running" % self.NODETYPE)
166             self._tub_ready_observerlist.fire(self)
167             return self
168         d.addCallback(_ready)
169         def _die(failure):
170             self.log('_startService() failed')
171             log.err(failure)
172             print "Node._startService failed, aborting"
173             print failure
174             #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
175             self._abort_process(failure)
176         d.addErrback(_die)
177
178     def _abort_process(self, failure):
179         self.log('calling os.abort()')
180         log('calling os.abort()')
181         print "calling os.abort()"
182         os.abort()
183
184     def stopService(self):
185         self.log("Node.stopService")
186         d = self._tub_ready_observerlist.when_fired()
187         def _really_stopService(ignored):
188             self.log("Node._really_stopService")
189             return service.MultiService.stopService(self)
190         d.addCallback(_really_stopService)
191         return d
192
193     def shutdown(self):
194         """Shut down the node. Returns a Deferred that fires (with None) when
195         it finally stops kicking."""
196         self.log("Node.shutdown")
197         return self.stopService()
198
199     def setup_logging(self):
200         # we replace the formatTime() method of the log observer that twistd
201         # set up for us, with a method that uses better timestamps.
202         for o in log.theLogPublisher.observers:
203             # o might be a FileLogObserver's .emit method
204             if type(o) is type(self.setup_logging): # bound method
205                 ob = o.im_self
206                 if isinstance(ob, log.FileLogObserver):
207                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
208                     ob.formatTime = newmeth
209         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
210
211         self.tub.setOption("logport-furlfile",
212                            os.path.join(self.basedir, "private","logport.furl"))
213         self.tub.setOption("log-gatherer-furlfile",
214                            os.path.join(self.basedir, "log_gatherer.furl"))
215         self.tub.setOption("bridge-twisted-logs", True)
216
217     def log(self, *args, **kwargs):
218         return tahoe_log.msg(*args, **kwargs)
219
220     def old_log(self, msg, src="", args=(), **kw):
221         if src:
222             logsrc = src
223         else:
224             logsrc = self.logSource
225         if args:
226             try:
227                 msg = msg % tuple(map(humanreadable.hr, args))
228             except TypeError, e:
229                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
230         msg = self.short_nodeid + ": " + humanreadable.hr(msg)
231         return log.callWithContext({"system":logsrc},
232                                    tahoe_log.msg, msg, **kw)
233
234     def _setup_tub(self, local_addresses):
235         # we can't get a dynamically-assigned portnum until our Tub is
236         # running, which means after startService.
237         l = self.tub.getListeners()[0]
238         portnum = l.getPortnum()
239         # record which port we're listening on, so we can grab the same one next time
240         open(self._portnumfile, "w").write("%d\n" % portnum)
241
242         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
243
244         addresses = []
245         try:
246             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
247                 mo = ADDR_RE.search(addrline)
248                 if mo:
249                     (addr, dummy, aportnum,) = mo.groups()
250                     if aportnum is None:
251                         aportnum = portnum
252                     addresses.append("%s:%d" % (addr, int(aportnum),))
253         except EnvironmentError:
254             pass
255
256         addresses.extend(local_addresses)
257
258         location = ",".join(addresses)
259         self.log("Tub location set to %s" % location)
260         self.tub.setLocation(location)
261         return self.tub
262
263     def when_tub_ready(self):
264         return self._tub_ready_observerlist.when_fired()
265
266     def add_service(self, s):
267         s.setServiceParent(self)
268         return s
269