2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
5 from twisted.python import log as twlog
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 import foolscap.logging.log
10 from allmydata import get_package_versions, get_package_versions_string
11 from allmydata.util import log
12 from allmydata.util import fileutil, iputil, observer, humanreadable
13 from allmydata.util.assertutil import precondition
15 from foolscap.logging import app_versions
17 # Add our application versions to the data that Foolscap's LogPublisher
19 for thing, things_version in get_package_versions().iteritems():
20 app_versions.add_version(thing, str(things_version))
22 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
23 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
26 def formatTimeTahoeStyle(self, when):
27 # we want UTC timestamps that look like:
28 # 2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
29 d = datetime.datetime.utcfromtimestamp(when)
31 return d.isoformat(" ")[:-3]+"Z"
33 return d.isoformat(" ") + ".000Z"
36 This directory contains files which contain private data for the Tahoe node,
37 such as private keys. On Unix-like systems, the permissions on this directory
38 are set to disallow users other than its owner from reading the contents of
39 the files. See the 'configuration.txt' documentation file for details."""
41 class Node(service.MultiService):
42 # this implements common functionality of both Client nodes and Introducer
44 NODETYPE = "unknown NODETYPE"
47 LOCAL_IP_FILE = "advertised_ip_addresses"
49 def __init__(self, basedir="."):
50 service.MultiService.__init__(self)
51 self.basedir = os.path.abspath(basedir)
52 self._tub_ready_observerlist = observer.OneShotObserverList()
53 fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
54 open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
55 certfile = os.path.join(self.basedir, "private", self.CERTFILE)
56 self.tub = Tub(certFile=certfile)
57 self.tub.setOption("logLocalFailures", True)
58 self.tub.setOption("logRemoteFailures", True)
59 self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
60 self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
61 self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
62 assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
63 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
65 portnum = int(open(self._portnumfile, "rU").read())
66 except (EnvironmentError, ValueError):
68 self.tub.listenOn("tcp:%d" % portnum)
69 # we must wait until our service has started before we can find out
70 # our IP address and thus do tub.setLocation, and we can't register
71 # any services with the Tub until after that point
72 self.tub.setServiceParent(self)
75 AUTHKEYSFILEBASE = "authorized_keys."
76 for f in os.listdir(self.basedir):
77 if f.startswith(AUTHKEYSFILEBASE):
78 keyfile = os.path.join(self.basedir, f)
79 portnum = int(f[len(AUTHKEYSFILEBASE):])
80 from allmydata import manhole
81 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
82 m.setServiceParent(self)
83 self.log("AuthorizedKeysManhole listening on %d" % portnum)
86 self.log("Node constructed. " + get_package_versions_string())
87 iputil.increase_rlimits()
89 def get_app_versions(self):
90 # TODO: merge this with allmydata.get_package_versions
91 return dict(app_versions.versions)
93 def get_config(self, name, required=False):
94 """Get the (string) contents of a config file, or None if the file
95 did not exist. If required=True, raise an exception rather than
96 returning None. Any leading or trailing whitespace will be stripped
98 fn = os.path.join(self.basedir, name)
100 return open(fn, "r").read().strip()
101 except EnvironmentError:
106 def write_private_config(self, name, value):
107 """Write the (string) contents of a private config file (which is a
108 config file that resides within the subdirectory named 'private'), and
109 return it. Any leading or trailing whitespace will be stripped from
112 privname = os.path.join(self.basedir, "private", name)
113 open(privname, "w").write(value.strip())
115 def get_or_create_private_config(self, name, default):
116 """Try to get the (string) contents of a private config file (which
117 is a config file that resides within the subdirectory named
118 'private'), and return it. Any leading or trailing whitespace will be
119 stripped from the data.
121 If the file does not exist, try to create it using default, and
122 then return the value that was written. If 'default' is a string,
123 use it as a default value. If not, treat it as a 0-argument callable
124 which is expected to return a string.
126 privname = os.path.join("private", name)
127 value = self.get_config(privname)
129 if isinstance(default, (str, unicode)):
133 fn = os.path.join(self.basedir, privname)
135 open(fn, "w").write(value)
136 except EnvironmentError, e:
137 self.log("Unable to write config file '%s'" % fn)
139 value = value.strip()
142 def write_config(self, name, value, mode="w"):
143 """Write a string to a config file."""
144 fn = os.path.join(self.basedir, name)
146 open(fn, mode).write(value)
147 except EnvironmentError, e:
148 self.log("Unable to write config file '%s'" % fn)
151 def startService(self):
152 # Note: this class can be started and stopped at most once.
153 self.log("Node.startService")
155 os.chmod("twistd.pid", 0644)
156 except EnvironmentError:
158 # Delay until the reactor is running.
159 eventual.eventually(self._startService)
161 def _startService(self):
162 precondition(reactor.running)
163 self.log("Node._startService")
165 service.MultiService.startService(self)
166 d = defer.succeed(None)
167 d.addCallback(lambda res: iputil.get_local_addresses_async())
168 d.addCallback(self._setup_tub)
170 self.log("%s running" % self.NODETYPE)
171 self._tub_ready_observerlist.fire(self)
173 d.addCallback(_ready)
174 d.addErrback(self._service_startup_failed)
176 def _service_startup_failed(self, failure):
177 self.log('_startService() failed')
179 print "Node._startService failed, aborting"
181 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
182 self.log('calling os.abort()')
183 twlog.msg('calling os.abort()') # make sure it gets into twistd.log
184 print "calling os.abort()"
187 def stopService(self):
188 self.log("Node.stopService")
189 d = self._tub_ready_observerlist.when_fired()
190 def _really_stopService(ignored):
191 self.log("Node._really_stopService")
192 return service.MultiService.stopService(self)
193 d.addCallback(_really_stopService)
197 """Shut down the node. Returns a Deferred that fires (with None) when
198 it finally stops kicking."""
199 self.log("Node.shutdown")
200 return self.stopService()
202 def setup_logging(self):
203 # we replace the formatTime() method of the log observer that twistd
204 # set up for us, with a method that uses better timestamps.
205 for o in twlog.theLogPublisher.observers:
206 # o might be a FileLogObserver's .emit method
207 if type(o) is type(self.setup_logging): # bound method
209 if isinstance(ob, twlog.FileLogObserver):
210 newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
211 ob.formatTime = newmeth
212 # TODO: twisted >2.5.0 offers maxRotatedFiles=50
214 self.tub.setOption("logport-furlfile",
215 os.path.join(self.basedir, "private","logport.furl"))
216 self.tub.setOption("log-gatherer-furlfile",
217 os.path.join(self.basedir, "log_gatherer.furl"))
218 self.tub.setOption("bridge-twisted-logs", True)
219 incident_dir = os.path.join(self.basedir, "logs", "incidents")
220 # this doesn't quite work yet: unit tests fail
221 foolscap.logging.log.setLogDir(incident_dir)
223 def log(self, *args, **kwargs):
224 return log.msg(*args, **kwargs)
226 def old_log(self, msg, src="", args=(), **kw):
230 logsrc = self.logSource
233 msg = msg % tuple(map(humanreadable.hr, args))
235 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
236 msg = self.short_nodeid + ": " + humanreadable.hr(msg)
237 return twlog.callWithContext({"system":logsrc},
238 twlog.msg, msg, **kw)
240 def _setup_tub(self, local_addresses):
241 # we can't get a dynamically-assigned portnum until our Tub is
242 # running, which means after startService.
243 l = self.tub.getListeners()[0]
244 portnum = l.getPortnum()
245 # record which port we're listening on, so we can grab the same one next time
246 open(self._portnumfile, "w").write("%d\n" % portnum)
248 local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
252 for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
253 mo = ADDR_RE.search(addrline)
255 (addr, dummy, aportnum,) = mo.groups()
258 addresses.append("%s:%d" % (addr, int(aportnum),))
259 except EnvironmentError:
262 addresses.extend(local_addresses)
264 location = ",".join(addresses)
265 self.log("Tub location set to %s" % location)
266 self.tub.setLocation(location)
269 def when_tub_ready(self):
270 return self._tub_ready_observerlist.when_fired()
272 def add_service(self, s):
273 s.setServiceParent(self)