]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
setup: fix bug in recent patch to use allmydata.get_package_versions() to tell the...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log as twlog
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 import foolscap.logging.log
10 from allmydata import get_package_versions, get_package_versions_string
11 from allmydata.util import log
12 from allmydata.util import fileutil, iputil, observer, humanreadable
13 from allmydata.util.assertutil import precondition
14
15 from foolscap.logging import app_versions
16
17 # Add our application versions to the data that Foolscap's LogPublisher
18 # reports.
19 for thing, things_version in get_package_versions().iteritems():
20     app_versions.add_version(thing, str(things_version))
21
22 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
23 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
24
25
26 def formatTimeTahoeStyle(self, when):
27     # we want UTC timestamps that look like:
28     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
29     d = datetime.datetime.utcfromtimestamp(when)
30     if d.microsecond:
31         return d.isoformat(" ")[:-3]+"Z"
32     else:
33         return d.isoformat(" ") + ".000Z"
34
35 PRIV_README="""
36 This directory contains files which contain private data for the Tahoe node,
37 such as private keys.  On Unix-like systems, the permissions on this directory
38 are set to disallow users other than its owner from reading the contents of
39 the files.   See the 'configuration.txt' documentation file for details."""
40
41 class Node(service.MultiService):
42     # this implements common functionality of both Client nodes and Introducer
43     # nodes.
44     NODETYPE = "unknown NODETYPE"
45     PORTNUMFILE = None
46     CERTFILE = "node.pem"
47     LOCAL_IP_FILE = "advertised_ip_addresses"
48
49     def __init__(self, basedir="."):
50         service.MultiService.__init__(self)
51         self.basedir = os.path.abspath(basedir)
52         self._tub_ready_observerlist = observer.OneShotObserverList()
53         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
54         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
55         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
56         self.tub = Tub(certFile=certfile)
57         self.tub.setOption("logLocalFailures", True)
58         self.tub.setOption("logRemoteFailures", True)
59         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
60         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
61         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
62         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
63         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
64         try:
65             portnum = int(open(self._portnumfile, "rU").read())
66         except (EnvironmentError, ValueError):
67             portnum = 0
68         self.tub.listenOn("tcp:%d" % portnum)
69         # we must wait until our service has started before we can find out
70         # our IP address and thus do tub.setLocation, and we can't register
71         # any services with the Tub until after that point
72         self.tub.setServiceParent(self)
73         self.logSource="Node"
74
75         AUTHKEYSFILEBASE = "authorized_keys."
76         for f in os.listdir(self.basedir):
77             if f.startswith(AUTHKEYSFILEBASE):
78                 keyfile = os.path.join(self.basedir, f)
79                 portnum = int(f[len(AUTHKEYSFILEBASE):])
80                 from allmydata import manhole
81                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
82                 m.setServiceParent(self)
83                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
84
85         self.setup_logging()
86         self.log("Node constructed. " + get_package_versions_string())
87         iputil.increase_rlimits()
88
89     def get_app_versions(self):
90         # TODO: merge this with allmydata.get_package_versions
91         return dict(app_versions.versions)
92
93     def get_config(self, name, required=False):
94         """Get the (string) contents of a config file, or None if the file
95         did not exist. If required=True, raise an exception rather than
96         returning None. Any leading or trailing whitespace will be stripped
97         from the data."""
98         fn = os.path.join(self.basedir, name)
99         try:
100             return open(fn, "r").read().strip()
101         except EnvironmentError:
102             if not required:
103                 return None
104             raise
105
106     def write_private_config(self, name, value):
107         """Write the (string) contents of a private config file (which is a
108         config file that resides within the subdirectory named 'private'), and
109         return it. Any leading or trailing whitespace will be stripped from
110         the data.
111         """
112         privname = os.path.join(self.basedir, "private", name)
113         open(privname, "w").write(value.strip())
114
115     def get_or_create_private_config(self, name, default):
116         """Try to get the (string) contents of a private config file (which
117         is a config file that resides within the subdirectory named
118         'private'), and return it. Any leading or trailing whitespace will be
119         stripped from the data.
120
121         If the file does not exist, try to create it using default, and
122         then return the value that was written. If 'default' is a string,
123         use it as a default value. If not, treat it as a 0-argument callable
124         which is expected to return a string.
125         """
126         privname = os.path.join("private", name)
127         value = self.get_config(privname)
128         if value is None:
129             if isinstance(default, (str, unicode)):
130                 value = default
131             else:
132                 value = default()
133             fn = os.path.join(self.basedir, privname)
134             try:
135                 open(fn, "w").write(value)
136             except EnvironmentError, e:
137                 self.log("Unable to write config file '%s'" % fn)
138                 self.log(e)
139             value = value.strip()
140         return value
141
142     def write_config(self, name, value, mode="w"):
143         """Write a string to a config file."""
144         fn = os.path.join(self.basedir, name)
145         try:
146             open(fn, mode).write(value)
147         except EnvironmentError, e:
148             self.log("Unable to write config file '%s'" % fn)
149             self.log(e)
150
151     def startService(self):
152         # Note: this class can be started and stopped at most once.
153         self.log("Node.startService")
154         try:
155             os.chmod("twistd.pid", 0644)
156         except EnvironmentError:
157             pass
158         # Delay until the reactor is running.
159         eventual.eventually(self._startService)
160
161     def _startService(self):
162         precondition(reactor.running)
163         self.log("Node._startService")
164
165         service.MultiService.startService(self)
166         d = defer.succeed(None)
167         d.addCallback(lambda res: iputil.get_local_addresses_async())
168         d.addCallback(self._setup_tub)
169         def _ready(res):
170             self.log("%s running" % self.NODETYPE)
171             self._tub_ready_observerlist.fire(self)
172             return self
173         d.addCallback(_ready)
174         d.addErrback(self._service_startup_failed)
175
176     def _service_startup_failed(self, failure):
177         self.log('_startService() failed')
178         log.err(failure)
179         print "Node._startService failed, aborting"
180         print failure
181         #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
182         self.log('calling os.abort()')
183         twlog.msg('calling os.abort()') # make sure it gets into twistd.log
184         print "calling os.abort()"
185         os.abort()
186
187     def stopService(self):
188         self.log("Node.stopService")
189         d = self._tub_ready_observerlist.when_fired()
190         def _really_stopService(ignored):
191             self.log("Node._really_stopService")
192             return service.MultiService.stopService(self)
193         d.addCallback(_really_stopService)
194         return d
195
196     def shutdown(self):
197         """Shut down the node. Returns a Deferred that fires (with None) when
198         it finally stops kicking."""
199         self.log("Node.shutdown")
200         return self.stopService()
201
202     def setup_logging(self):
203         # we replace the formatTime() method of the log observer that twistd
204         # set up for us, with a method that uses better timestamps.
205         for o in twlog.theLogPublisher.observers:
206             # o might be a FileLogObserver's .emit method
207             if type(o) is type(self.setup_logging): # bound method
208                 ob = o.im_self
209                 if isinstance(ob, twlog.FileLogObserver):
210                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
211                     ob.formatTime = newmeth
212         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
213
214         self.tub.setOption("logport-furlfile",
215                            os.path.join(self.basedir, "private","logport.furl"))
216         self.tub.setOption("log-gatherer-furlfile",
217                            os.path.join(self.basedir, "log_gatherer.furl"))
218         self.tub.setOption("bridge-twisted-logs", True)
219         incident_dir = os.path.join(self.basedir, "logs", "incidents")
220         # this doesn't quite work yet: unit tests fail
221         foolscap.logging.log.setLogDir(incident_dir)
222
223     def log(self, *args, **kwargs):
224         return log.msg(*args, **kwargs)
225
226     def old_log(self, msg, src="", args=(), **kw):
227         if src:
228             logsrc = src
229         else:
230             logsrc = self.logSource
231         if args:
232             try:
233                 msg = msg % tuple(map(humanreadable.hr, args))
234             except TypeError, e:
235                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
236         msg = self.short_nodeid + ": " + humanreadable.hr(msg)
237         return twlog.callWithContext({"system":logsrc},
238                                      twlog.msg, msg, **kw)
239
240     def _setup_tub(self, local_addresses):
241         # we can't get a dynamically-assigned portnum until our Tub is
242         # running, which means after startService.
243         l = self.tub.getListeners()[0]
244         portnum = l.getPortnum()
245         # record which port we're listening on, so we can grab the same one next time
246         open(self._portnumfile, "w").write("%d\n" % portnum)
247
248         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
249
250         addresses = []
251         try:
252             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
253                 mo = ADDR_RE.search(addrline)
254                 if mo:
255                     (addr, dummy, aportnum,) = mo.groups()
256                     if aportnum is None:
257                         aportnum = portnum
258                     addresses.append("%s:%d" % (addr, int(aportnum),))
259         except EnvironmentError:
260             pass
261
262         addresses.extend(local_addresses)
263
264         location = ",".join(addresses)
265         self.log("Tub location set to %s" % location)
266         self.tub.setLocation(location)
267         return self.tub
268
269     def when_tub_ready(self):
270         return self._tub_ready_observerlist.when_fired()
271
272     def add_service(self, s):
273         s.setServiceParent(self)
274         return s
275