]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
start using Foolscap's 'incident-logging' feature, which requires foolscap-0.2.9
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log as tahoe_log
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 import foolscap.logging.log
10 from allmydata import get_package_versions_string
11 from allmydata.util import log
12 from allmydata.util import fileutil, iputil, observer, humanreadable
13 from allmydata.util.assertutil import precondition
14
15 # Just to get their versions:
16 import allmydata, pycryptopp, zfec
17
18 from foolscap.logging.publish import LogPublisher
19 # Add our application versions to the data that Foolscap's LogPublisher
20 # reports. Our __version__ attributes are actually instances of a "Version"
21 # class, so convert them into strings first.
22 LogPublisher.versions['allmydata'] = str(allmydata.__version__)
23 LogPublisher.versions['zfec'] = str(zfec.__version__)
24 LogPublisher.versions['pycryptopp'] = str(pycryptopp.__version__)
25
26 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
27 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
28
29
30 def formatTimeTahoeStyle(self, when):
31     # we want UTC timestamps that look like:
32     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
33     d = datetime.datetime.utcfromtimestamp(when)
34     if d.microsecond:
35         return d.isoformat(" ")[:-3]+"Z"
36     else:
37         return d.isoformat(" ") + ".000Z"
38
39 PRIV_README="""
40 This directory contains files which contain private data for the Tahoe node,
41 such as private keys.  On Unix-like systems, the permissions on this directory
42 are set to disallow users other than its owner from reading the contents of
43 the files.   See the 'configuration.txt' documentation file for details."""
44
45 class Node(service.MultiService):
46     # this implements common functionality of both Client nodes and Introducer
47     # nodes.
48     NODETYPE = "unknown NODETYPE"
49     PORTNUMFILE = None
50     CERTFILE = "node.pem"
51     LOCAL_IP_FILE = "advertised_ip_addresses"
52
53     def __init__(self, basedir="."):
54         service.MultiService.__init__(self)
55         self.basedir = os.path.abspath(basedir)
56         self._tub_ready_observerlist = observer.OneShotObserverList()
57         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
58         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
59         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
60         self.tub = Tub(certFile=certfile)
61         self.tub.setOption("logLocalFailures", True)
62         self.tub.setOption("logRemoteFailures", True)
63         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
64         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
65         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
66         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
67         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
68         try:
69             portnum = int(open(self._portnumfile, "rU").read())
70         except (EnvironmentError, ValueError):
71             portnum = 0
72         self.tub.listenOn("tcp:%d" % portnum)
73         # we must wait until our service has started before we can find out
74         # our IP address and thus do tub.setLocation, and we can't register
75         # any services with the Tub until after that point
76         self.tub.setServiceParent(self)
77         self.logSource="Node"
78
79         AUTHKEYSFILEBASE = "authorized_keys."
80         for f in os.listdir(self.basedir):
81             if f.startswith(AUTHKEYSFILEBASE):
82                 keyfile = os.path.join(self.basedir, f)
83                 portnum = int(f[len(AUTHKEYSFILEBASE):])
84                 from allmydata import manhole
85                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
86                 m.setServiceParent(self)
87                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
88
89         self.setup_logging()
90         self.log("Node constructed. " + get_package_versions_string())
91         iputil.increase_rlimits()
92
93     def get_config(self, name, required=False):
94         """Get the (string) contents of a config file, or None if the file
95         did not exist. If required=True, raise an exception rather than
96         returning None. Any leading or trailing whitespace will be stripped
97         from the data."""
98         fn = os.path.join(self.basedir, name)
99         try:
100             return open(fn, "r").read().strip()
101         except EnvironmentError:
102             if not required:
103                 return None
104             raise
105
106     def write_private_config(self, name, value):
107         """Write the (string) contents of a private config file (which is a
108         config file that resides within the subdirectory named 'private'), and
109         return it. Any leading or trailing whitespace will be stripped from
110         the data.
111         """
112         privname = os.path.join(self.basedir, "private", name)
113         open(privname, "w").write(value.strip())
114
115     def get_or_create_private_config(self, name, default):
116         """Try to get the (string) contents of a private config file (which
117         is a config file that resides within the subdirectory named
118         'private'), and return it. Any leading or trailing whitespace will be
119         stripped from the data.
120
121         If the file does not exist, try to create it using default, and
122         then return the value that was written. If 'default' is a string,
123         use it as a default value. If not, treat it as a 0-argument callable
124         which is expected to return a string.
125         """
126         privname = os.path.join("private", name)
127         value = self.get_config(privname)
128         if value is None:
129             if isinstance(default, (str, unicode)):
130                 value = default
131             else:
132                 value = default()
133             fn = os.path.join(self.basedir, privname)
134             try:
135                 open(fn, "w").write(value)
136             except EnvironmentError, e:
137                 self.log("Unable to write config file '%s'" % fn)
138                 self.log(e)
139             value = value.strip()
140         return value
141
142     def write_config(self, name, value, mode="w"):
143         """Write a string to a config file."""
144         fn = os.path.join(self.basedir, name)
145         try:
146             open(fn, mode).write(value)
147         except EnvironmentError, e:
148             self.log("Unable to write config file '%s'" % fn)
149             self.log(e)
150
151     def startService(self):
152         # Note: this class can be started and stopped at most once.
153         self.log("Node.startService")
154         try:
155             os.chmod("twistd.pid", 0644)
156         except EnvironmentError:
157             pass
158         # Delay until the reactor is running.
159         eventual.eventually(self._startService)
160
161     def _startService(self):
162         precondition(reactor.running)
163         self.log("Node._startService")
164
165         service.MultiService.startService(self)
166         d = defer.succeed(None)
167         d.addCallback(lambda res: iputil.get_local_addresses_async())
168         d.addCallback(self._setup_tub)
169         def _ready(res):
170             self.log("%s running" % self.NODETYPE)
171             self._tub_ready_observerlist.fire(self)
172             return self
173         d.addCallback(_ready)
174         d.addErrback(self._service_startup_failed)
175
176     def _service_startup_failed(self, failure):
177         self.log('_startService() failed')
178         tahoe_log.err(failure)
179         print "Node._startService failed, aborting"
180         print failure
181         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
182         self.log('calling os.abort()')
183         tahoe_log.msg('calling os.abort()')
184         print "calling os.abort()"
185         os.abort()
186
187     def stopService(self):
188         self.log("Node.stopService")
189         d = self._tub_ready_observerlist.when_fired()
190         def _really_stopService(ignored):
191             self.log("Node._really_stopService")
192             return service.MultiService.stopService(self)
193         d.addCallback(_really_stopService)
194         return d
195
196     def shutdown(self):
197         """Shut down the node. Returns a Deferred that fires (with None) when
198         it finally stops kicking."""
199         self.log("Node.shutdown")
200         return self.stopService()
201
202     def setup_logging(self):
203         # we replace the formatTime() method of the log observer that twistd
204         # set up for us, with a method that uses better timestamps.
205         for o in tahoe_log.theLogPublisher.observers:
206             # o might be a FileLogObserver's .emit method
207             if type(o) is type(self.setup_logging): # bound method
208                 ob = o.im_self
209                 if isinstance(ob, tahoe_log.FileLogObserver):
210                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
211                     ob.formatTime = newmeth
212         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
213
214         self.tub.setOption("logport-furlfile",
215                            os.path.join(self.basedir, "private","logport.furl"))
216         self.tub.setOption("log-gatherer-furlfile",
217                            os.path.join(self.basedir, "log_gatherer.furl"))
218         self.tub.setOption("bridge-twisted-logs", True)
219         incident_dir = os.path.join(self.basedir, "logs", "incidents")
220         foolscap.logging.log.setLogDir(incident_dir)
221
222     def log(self, *args, **kwargs):
223         return log.msg(*args, **kwargs)
224
225     def old_log(self, msg, src="", args=(), **kw):
226         if src:
227             logsrc = src
228         else:
229             logsrc = self.logSource
230         if args:
231             try:
232                 msg = msg % tuple(map(humanreadable.hr, args))
233             except TypeError, e:
234                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
235         msg = self.short_nodeid + ": " + humanreadable.hr(msg)
236         return tahoe_log.callWithContext({"system":logsrc},
237                                          tahoe_log.msg, msg, **kw)
238
239     def _setup_tub(self, local_addresses):
240         # we can't get a dynamically-assigned portnum until our Tub is
241         # running, which means after startService.
242         l = self.tub.getListeners()[0]
243         portnum = l.getPortnum()
244         # record which port we're listening on, so we can grab the same one next time
245         open(self._portnumfile, "w").write("%d\n" % portnum)
246
247         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
248
249         addresses = []
250         try:
251             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
252                 mo = ADDR_RE.search(addrline)
253                 if mo:
254                     (addr, dummy, aportnum,) = mo.groups()
255                     if aportnum is None:
256                         aportnum = portnum
257                     addresses.append("%s:%d" % (addr, int(aportnum),))
258         except EnvironmentError:
259             pass
260
261         addresses.extend(local_addresses)
262
263         location = ",".join(addresses)
264         self.log("Tub location set to %s" % location)
265         self.tub.setLocation(location)
266         return self.tub
267
268     def when_tub_ready(self):
269         return self._tub_ready_observerlist.when_fired()
270
271     def add_service(self, s):
272         s.setServiceParent(self)
273         return s
274