]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
put all private state in $BASEDIR/private
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 from allmydata import get_package_versions_string
10 from allmydata.util import log as tahoe_log
11 from allmydata.util import fileutil, iputil, observer, humanreadable
12 from allmydata.util.assertutil import precondition
13
14 # Just to get their versions:
15 import allmydata, pycryptopp, zfec
16
17 from foolscap.logging.publish import LogPublisher
18 # Add our application versions to the data that Foolscap's
19 # LogPublisher reports. Our __version__ attributes are actually
20 # instances of allmydata.util.version_class.Version, so convert them
21 # into strings first.
22 LogPublisher.versions['allmydata'] = str(allmydata.__version__)
23 LogPublisher.versions['zfec'] = str(zfec.__version__)
24 LogPublisher.versions['pycryptopp'] = str(pycryptopp.__version__)
25
26 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
27 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
28
29
30 def formatTimeTahoeStyle(self, when):
31     # we want UTC timestamps that look like:
32     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
33     d = datetime.datetime.utcfromtimestamp(when)
34     if d.microsecond:
35         return d.isoformat(" ")[:-3]+"Z"
36     else:
37         return d.isoformat(" ") + ".000Z"
38
39 PRIV_README="""
40 This directory contains files which contain private data for the Tahoe node,
41 such as private keys.  On Unix-like systems, the permissions on this directory
42 are set to disallow users other than its owner from reading the contents of
43 the files.   See the 'configuration.txt' documentation file for details."""
44
45 class Node(service.MultiService):
46     # this implements common functionality of both Client nodes and Introducer
47     # nodes.
48     NODETYPE = "unknown NODETYPE"
49     PORTNUMFILE = None
50     CERTFILE = "node.pem"
51     LOCAL_IP_FILE = "advertised_ip_addresses"
52
53     def __init__(self, basedir="."):
54         service.MultiService.__init__(self)
55         self.basedir = os.path.abspath(basedir)
56         self._tub_ready_observerlist = observer.OneShotObserverList()
57         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
58         open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
59         certfile = os.path.join(self.basedir, "private", self.CERTFILE)
60         self.tub = Tub(certFile=certfile)
61         self.tub.setOption("logLocalFailures", True)
62         self.tub.setOption("logRemoteFailures", True)
63         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
64         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
65         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
66         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
67         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
68         try:
69             portnum = int(open(self._portnumfile, "rU").read())
70         except (EnvironmentError, ValueError):
71             portnum = 0
72         self.tub.listenOn("tcp:%d" % portnum)
73         # we must wait until our service has started before we can find out
74         # our IP address and thus do tub.setLocation, and we can't register
75         # any services with the Tub until after that point
76         self.tub.setServiceParent(self)
77         self.logSource="Node"
78
79         AUTHKEYSFILEBASE = "authorized_keys."
80         for f in os.listdir(self.basedir):
81             if f.startswith(AUTHKEYSFILEBASE):
82                 keyfile = os.path.join(self.basedir, f)
83                 portnum = int(f[len(AUTHKEYSFILEBASE):])
84                 from allmydata import manhole
85                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
86                 m.setServiceParent(self)
87                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
88
89         self.setup_logging()
90         self.log("Node constructed. " + get_package_versions_string())
91         iputil.increase_rlimits()
92
93     def get_config(self, name, required=False):
94         """Get the (string) contents of a config file, or None if the file
95         did not exist. If required=True, raise an exception rather than
96         returning None. Any leading or trailing whitespace will be stripped
97         from the data."""
98         fn = os.path.join(self.basedir, name)
99         try:
100             return open(fn, "r").read().strip()
101         except EnvironmentError:
102             if not required:
103                 return None
104             raise
105
106     def write_private_config(self, name, value):
107         """Write the (string) contents of a private config file (which is a
108         config file that resides within the subdirectory named 'private'), and
109         return it. Any leading or trailing whitespace will be stripped from
110         the data.
111         """
112         privname = os.path.join(self.basedir, "private", name)
113         open(privname, "w").write(value.strip())
114
115     def get_or_create_private_config(self, name, default):
116         """Try to get the (string) contents of a private config file (which
117         is a config file that resides within the subdirectory named
118         'private'), and return it. Any leading or trailing whitespace will be
119         stripped from the data.
120
121         If the file does not exist, try to create it using default, and
122         then return the value that was written. If 'default' is a string,
123         use it as a default value. If not, treat it as a 0-argument callable
124         which is expected to return a string.
125         """
126         privname = os.path.join("private", name)
127         value = self.get_config(privname)
128         if value is None:
129             if isinstance(default, (str, unicode)):
130                 value = default
131             else:
132                 value = default()
133             fn = os.path.join(self.basedir, privname)
134             try:
135                 open(fn, "w").write(value)
136             except EnvironmentError, e:
137                 self.log("Unable to write config file '%s'" % fn)
138                 self.log(e)
139             value = value.strip()
140         return value
141
142     def write_config(self, name, value, mode="w"):
143         """Write a string to a config file."""
144         fn = os.path.join(self.basedir, name)
145         try:
146             open(fn, mode).write(value)
147         except EnvironmentError, e:
148             self.log("Unable to write config file '%s'" % fn)
149             self.log(e)
150
151     def startService(self):
152         # Note: this class can be started and stopped at most once.
153         self.log("Node.startService")
154         # Delay until the reactor is running.
155         eventual.eventually(self._startService)
156
157     def _startService(self):
158         precondition(reactor.running)
159         self.log("Node._startService")
160
161         service.MultiService.startService(self)
162         d = defer.succeed(None)
163         d.addCallback(lambda res: iputil.get_local_addresses_async())
164         d.addCallback(self._setup_tub)
165         d.addCallback(lambda res: self.tub_ready())
166         def _ready(res):
167             self.log("%s running" % self.NODETYPE)
168             self._tub_ready_observerlist.fire(self)
169             return self
170         d.addCallback(_ready)
171         def _die(failure):
172             self.log('_startService() failed')
173             log.err(failure)
174             #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
175             self.log('calling os.abort()')
176             os.abort()
177         d.addErrback(_die)
178
179     def stopService(self):
180         self.log("Node.stopService")
181         d = self._tub_ready_observerlist.when_fired()
182         def _really_stopService(ignored):
183             self.log("Node._really_stopService")
184             return service.MultiService.stopService(self)
185         d.addCallback(_really_stopService)
186         return d
187
188     def shutdown(self):
189         """Shut down the node. Returns a Deferred that fires (with None) when
190         it finally stops kicking."""
191         self.log("Node.shutdown")
192         return self.stopService()
193
194     def setup_logging(self):
195         # we replace the formatTime() method of the log observer that twistd
196         # set up for us, with a method that uses better timestamps.
197         for o in log.theLogPublisher.observers:
198             # o might be a FileLogObserver's .emit method
199             if type(o) is type(self.setup_logging): # bound method
200                 ob = o.im_self
201                 if isinstance(ob, log.FileLogObserver):
202                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
203                     ob.formatTime = newmeth
204         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
205
206         self.tub.setOption("logport-furlfile",
207                            os.path.join(self.basedir, "logport.furl"))
208         self.tub.setOption("log-gatherer-furlfile",
209                            os.path.join(self.basedir, "log_gatherer.furl"))
210
211     def log(self, msg, src="", args=(), **kw):
212         if src:
213             logsrc = src
214         else:
215             logsrc = self.logSource
216         if args:
217             try:
218                 msg = msg % tuple(map(humanreadable.hr, args))
219             except TypeError, e:
220                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
221         msg = self.short_nodeid + ": " + humanreadable.hr(msg)
222         return log.callWithContext({"system":logsrc},
223                                    tahoe_log.msg, msg, **kw)
224
225     def _setup_tub(self, local_addresses):
226         # we can't get a dynamically-assigned portnum until our Tub is
227         # running, which means after startService.
228         l = self.tub.getListeners()[0]
229         portnum = l.getPortnum()
230         # record which port we're listening on, so we can grab the same one next time
231         open(self._portnumfile, "w").write("%d\n" % portnum)
232
233         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
234
235         addresses = []
236         try:
237             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
238                 mo = ADDR_RE.search(addrline)
239                 if mo:
240                     (addr, dummy, aportnum,) = mo.groups()
241                     if aportnum is None:
242                         aportnum = portnum
243                     addresses.append("%s:%d" % (addr, int(aportnum),))
244         except EnvironmentError:
245             pass
246
247         addresses.extend(local_addresses)
248
249         location = ",".join(addresses)
250         self.log("Tub location set to %s" % location)
251         self.tub.setLocation(location)
252         return self.tub
253
254     def tub_ready(self):
255         # called when the Tub is available for registerReference
256         pass
257
258     def when_tub_ready(self):
259         return self._tub_ready_observerlist.when_fired()
260
261     def add_service(self, s):
262         s.setServiceParent(self)
263         return s
264