]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
use foolscap's new app_versions API, require foolscap-0.3.1
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log as twlog
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 import foolscap.logging.log
10 from allmydata import get_package_versions_string
11 from allmydata.util import log
12 from allmydata.util import fileutil, iputil, observer, humanreadable
13 from allmydata.util.assertutil import precondition
14
15 # Just to get their versions:
16 import allmydata, pycryptopp, zfec
17
18 from foolscap.logging import app_versions
19 # Add our application versions to the data that Foolscap's LogPublisher
20 # reports. Our __version__ attributes are actually instances of a "Version"
21 # class, so convert them into strings first.
22 app_versions.add_version('allmydata', str(allmydata.__version__))
23 app_versions.add_version('zfec', str(zfec.__version__))
24 app_versions.add_version('pycryptopp', str(pycryptopp.__version__))
25
26 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
27 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
28
29
30 def formatTimeTahoeStyle(self, when):
31     # we want UTC timestamps that look like:
32     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
33     d = datetime.datetime.utcfromtimestamp(when)
34     if d.microsecond:
35         return d.isoformat(" ")[:-3]+"Z"
36     else:
37         return d.isoformat(" ") + ".000Z"
38
39 PRIV_README="""
40 This directory contains files which contain private data for the Tahoe node,
41 such as private keys.  On Unix-like systems, the permissions on this directory
42 are set to disallow users other than its owner from reading the contents of
43 the files.   See the 'configuration.txt' documentation file for details."""
44
45 class Node(service.MultiService):
46     # this implements common functionality of both Client nodes and Introducer
47     # nodes.
48     NODETYPE = "unknown NODETYPE"
49     PORTNUMFILE = None
50     CERTFILE = "node.pem"
51     LOCAL_IP_FILE = "advertised_ip_addresses"
52
53     def __init__(self, basedir="."):
54         service.MultiService.__init__(self)
55         self.basedir = os.path.abspath(basedir)
56         self._tub_ready_observerlist = observer.OneShotObserverList()
57         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
58         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
59         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
60         self.tub = Tub(certFile=certfile)
61         self.tub.setOption("logLocalFailures", True)
62         self.tub.setOption("logRemoteFailures", True)
63         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
64         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
65         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
66         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
67         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
68         try:
69             portnum = int(open(self._portnumfile, "rU").read())
70         except (EnvironmentError, ValueError):
71             portnum = 0
72         self.tub.listenOn("tcp:%d" % portnum)
73         # we must wait until our service has started before we can find out
74         # our IP address and thus do tub.setLocation, and we can't register
75         # any services with the Tub until after that point
76         self.tub.setServiceParent(self)
77         self.logSource="Node"
78
79         AUTHKEYSFILEBASE = "authorized_keys."
80         for f in os.listdir(self.basedir):
81             if f.startswith(AUTHKEYSFILEBASE):
82                 keyfile = os.path.join(self.basedir, f)
83                 portnum = int(f[len(AUTHKEYSFILEBASE):])
84                 from allmydata import manhole
85                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
86                 m.setServiceParent(self)
87                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
88
89         self.setup_logging()
90         self.log("Node constructed. " + get_package_versions_string())
91         iputil.increase_rlimits()
92
93     def get_app_versions(self):
94         # TODO: merge this with allmydata.get_package_versions
95         return dict(app_versions.versions)
96
97     def get_config(self, name, required=False):
98         """Get the (string) contents of a config file, or None if the file
99         did not exist. If required=True, raise an exception rather than
100         returning None. Any leading or trailing whitespace will be stripped
101         from the data."""
102         fn = os.path.join(self.basedir, name)
103         try:
104             return open(fn, "r").read().strip()
105         except EnvironmentError:
106             if not required:
107                 return None
108             raise
109
110     def write_private_config(self, name, value):
111         """Write the (string) contents of a private config file (which is a
112         config file that resides within the subdirectory named 'private'), and
113         return it. Any leading or trailing whitespace will be stripped from
114         the data.
115         """
116         privname = os.path.join(self.basedir, "private", name)
117         open(privname, "w").write(value.strip())
118
119     def get_or_create_private_config(self, name, default):
120         """Try to get the (string) contents of a private config file (which
121         is a config file that resides within the subdirectory named
122         'private'), and return it. Any leading or trailing whitespace will be
123         stripped from the data.
124
125         If the file does not exist, try to create it using default, and
126         then return the value that was written. If 'default' is a string,
127         use it as a default value. If not, treat it as a 0-argument callable
128         which is expected to return a string.
129         """
130         privname = os.path.join("private", name)
131         value = self.get_config(privname)
132         if value is None:
133             if isinstance(default, (str, unicode)):
134                 value = default
135             else:
136                 value = default()
137             fn = os.path.join(self.basedir, privname)
138             try:
139                 open(fn, "w").write(value)
140             except EnvironmentError, e:
141                 self.log("Unable to write config file '%s'" % fn)
142                 self.log(e)
143             value = value.strip()
144         return value
145
146     def write_config(self, name, value, mode="w"):
147         """Write a string to a config file."""
148         fn = os.path.join(self.basedir, name)
149         try:
150             open(fn, mode).write(value)
151         except EnvironmentError, e:
152             self.log("Unable to write config file '%s'" % fn)
153             self.log(e)
154
155     def startService(self):
156         # Note: this class can be started and stopped at most once.
157         self.log("Node.startService")
158         try:
159             os.chmod("twistd.pid", 0644)
160         except EnvironmentError:
161             pass
162         # Delay until the reactor is running.
163         eventual.eventually(self._startService)
164
165     def _startService(self):
166         precondition(reactor.running)
167         self.log("Node._startService")
168
169         service.MultiService.startService(self)
170         d = defer.succeed(None)
171         d.addCallback(lambda res: iputil.get_local_addresses_async())
172         d.addCallback(self._setup_tub)
173         def _ready(res):
174             self.log("%s running" % self.NODETYPE)
175             self._tub_ready_observerlist.fire(self)
176             return self
177         d.addCallback(_ready)
178         d.addErrback(self._service_startup_failed)
179
180     def _service_startup_failed(self, failure):
181         self.log('_startService() failed')
182         log.err(failure)
183         print "Node._startService failed, aborting"
184         print failure
185         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
186         self.log('calling os.abort()')
187         twlog.msg('calling os.abort()') # make sure it gets into twistd.log
188         print "calling os.abort()"
189         os.abort()
190
191     def stopService(self):
192         self.log("Node.stopService")
193         d = self._tub_ready_observerlist.when_fired()
194         def _really_stopService(ignored):
195             self.log("Node._really_stopService")
196             return service.MultiService.stopService(self)
197         d.addCallback(_really_stopService)
198         return d
199
200     def shutdown(self):
201         """Shut down the node. Returns a Deferred that fires (with None) when
202         it finally stops kicking."""
203         self.log("Node.shutdown")
204         return self.stopService()
205
206     def setup_logging(self):
207         # we replace the formatTime() method of the log observer that twistd
208         # set up for us, with a method that uses better timestamps.
209         for o in twlog.theLogPublisher.observers:
210             # o might be a FileLogObserver's .emit method
211             if type(o) is type(self.setup_logging): # bound method
212                 ob = o.im_self
213                 if isinstance(ob, twlog.FileLogObserver):
214                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
215                     ob.formatTime = newmeth
216         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
217
218         self.tub.setOption("logport-furlfile",
219                            os.path.join(self.basedir, "private","logport.furl"))
220         self.tub.setOption("log-gatherer-furlfile",
221                            os.path.join(self.basedir, "log_gatherer.furl"))
222         self.tub.setOption("bridge-twisted-logs", True)
223         incident_dir = os.path.join(self.basedir, "logs", "incidents")
224         # this doesn't quite work yet: unit tests fail
225         foolscap.logging.log.setLogDir(incident_dir)
226
227     def log(self, *args, **kwargs):
228         return log.msg(*args, **kwargs)
229
230     def old_log(self, msg, src="", args=(), **kw):
231         if src:
232             logsrc = src
233         else:
234             logsrc = self.logSource
235         if args:
236             try:
237                 msg = msg % tuple(map(humanreadable.hr, args))
238             except TypeError, e:
239                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
240         msg = self.short_nodeid + ": " + humanreadable.hr(msg)
241         return twlog.callWithContext({"system":logsrc},
242                                      twlog.msg, msg, **kw)
243
244     def _setup_tub(self, local_addresses):
245         # we can't get a dynamically-assigned portnum until our Tub is
246         # running, which means after startService.
247         l = self.tub.getListeners()[0]
248         portnum = l.getPortnum()
249         # record which port we're listening on, so we can grab the same one next time
250         open(self._portnumfile, "w").write("%d\n" % portnum)
251
252         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
253
254         addresses = []
255         try:
256             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
257                 mo = ADDR_RE.search(addrline)
258                 if mo:
259                     (addr, dummy, aportnum,) = mo.groups()
260                     if aportnum is None:
261                         aportnum = portnum
262                     addresses.append("%s:%d" % (addr, int(aportnum),))
263         except EnvironmentError:
264             pass
265
266         addresses.extend(local_addresses)
267
268         location = ",".join(addresses)
269         self.log("Tub location set to %s" % location)
270         self.tub.setLocation(location)
271         return self.tub
272
273     def when_tub_ready(self):
274         return self._tub_ready_observerlist.when_fired()
275
276     def add_service(self, s):
277         s.setServiceParent(self)
278         return s
279