2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
5 from twisted.python import log
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 from allmydata import get_package_versions_string
10 from allmydata.util import log as tahoe_log
11 from allmydata.util import fileutil, iputil, observer, humanreadable
12 from allmydata.util.assertutil import precondition
14 # Just to get their versions:
15 import allmydata, pycryptopp, zfec
17 from foolscap.logging.publish import LogPublisher
18 # Add our application versions to the data that Foolscap's LogPublisher
19 # reports. Our __version__ attributes are actually instances of a "Version"
20 # class, so convert them into strings first.
21 LogPublisher.versions['allmydata'] = str(allmydata.__version__)
22 LogPublisher.versions['zfec'] = str(zfec.__version__)
23 LogPublisher.versions['pycryptopp'] = str(pycryptopp.__version__)
25 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
26 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
29 def formatTimeTahoeStyle(self, when):
30 # we want UTC timestamps that look like:
31 # 2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
32 d = datetime.datetime.utcfromtimestamp(when)
34 return d.isoformat(" ")[:-3]+"Z"
36 return d.isoformat(" ") + ".000Z"
39 This directory contains files which contain private data for the Tahoe node,
40 such as private keys. On Unix-like systems, the permissions on this directory
41 are set to disallow users other than its owner from reading the contents of
42 the files. See the 'configuration.txt' documentation file for details."""
44 class Node(service.MultiService):
45 # this implements common functionality of both Client nodes and Introducer
47 NODETYPE = "unknown NODETYPE"
50 LOCAL_IP_FILE = "advertised_ip_addresses"
52 def __init__(self, basedir="."):
53 service.MultiService.__init__(self)
54 self.basedir = os.path.abspath(basedir)
55 self._tub_ready_observerlist = observer.OneShotObserverList()
56 fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
57 open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
58 certfile = os.path.join(self.basedir, "private", self.CERTFILE)
59 self.tub = Tub(certFile=certfile)
60 self.tub.setOption("logLocalFailures", True)
61 self.tub.setOption("logRemoteFailures", True)
62 self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
63 self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
64 self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
65 assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
66 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
68 portnum = int(open(self._portnumfile, "rU").read())
69 except (EnvironmentError, ValueError):
71 self.tub.listenOn("tcp:%d" % portnum)
72 # we must wait until our service has started before we can find out
73 # our IP address and thus do tub.setLocation, and we can't register
74 # any services with the Tub until after that point
75 self.tub.setServiceParent(self)
78 AUTHKEYSFILEBASE = "authorized_keys."
79 for f in os.listdir(self.basedir):
80 if f.startswith(AUTHKEYSFILEBASE):
81 keyfile = os.path.join(self.basedir, f)
82 portnum = int(f[len(AUTHKEYSFILEBASE):])
83 from allmydata import manhole
84 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
85 m.setServiceParent(self)
86 self.log("AuthorizedKeysManhole listening on %d" % portnum)
89 self.log("Node constructed. " + get_package_versions_string())
90 iputil.increase_rlimits()
92 def get_config(self, name, required=False):
93 """Get the (string) contents of a config file, or None if the file
94 did not exist. If required=True, raise an exception rather than
95 returning None. Any leading or trailing whitespace will be stripped
97 fn = os.path.join(self.basedir, name)
99 return open(fn, "r").read().strip()
100 except EnvironmentError:
105 def write_private_config(self, name, value):
106 """Write the (string) contents of a private config file (which is a
107 config file that resides within the subdirectory named 'private'), and
108 return it. Any leading or trailing whitespace will be stripped from
111 privname = os.path.join(self.basedir, "private", name)
112 open(privname, "w").write(value.strip())
114 def get_or_create_private_config(self, name, default):
115 """Try to get the (string) contents of a private config file (which
116 is a config file that resides within the subdirectory named
117 'private'), and return it. Any leading or trailing whitespace will be
118 stripped from the data.
120 If the file does not exist, try to create it using default, and
121 then return the value that was written. If 'default' is a string,
122 use it as a default value. If not, treat it as a 0-argument callable
123 which is expected to return a string.
125 privname = os.path.join("private", name)
126 value = self.get_config(privname)
128 if isinstance(default, (str, unicode)):
132 fn = os.path.join(self.basedir, privname)
134 open(fn, "w").write(value)
135 except EnvironmentError, e:
136 self.log("Unable to write config file '%s'" % fn)
138 value = value.strip()
141 def write_config(self, name, value, mode="w"):
142 """Write a string to a config file."""
143 fn = os.path.join(self.basedir, name)
145 open(fn, mode).write(value)
146 except EnvironmentError, e:
147 self.log("Unable to write config file '%s'" % fn)
150 def startService(self):
151 # Note: this class can be started and stopped at most once.
152 self.log("Node.startService")
153 # Delay until the reactor is running.
154 eventual.eventually(self._startService)
156 def _startService(self):
157 precondition(reactor.running)
158 self.log("Node._startService")
160 service.MultiService.startService(self)
161 d = defer.succeed(None)
162 d.addCallback(lambda res: iputil.get_local_addresses_async())
163 d.addCallback(self._setup_tub)
165 self.log("%s running" % self.NODETYPE)
166 self._tub_ready_observerlist.fire(self)
168 d.addCallback(_ready)
169 d.addErrback(self._service_startup_failed)
171 def _service_startup_failed(self, failure):
172 self.log('_startService() failed')
174 print "Node._startService failed, aborting"
176 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
177 self.log('calling os.abort()')
178 log.msg('calling os.abort()')
179 print "calling os.abort()"
182 def stopService(self):
183 self.log("Node.stopService")
184 d = self._tub_ready_observerlist.when_fired()
185 def _really_stopService(ignored):
186 self.log("Node._really_stopService")
187 return service.MultiService.stopService(self)
188 d.addCallback(_really_stopService)
192 """Shut down the node. Returns a Deferred that fires (with None) when
193 it finally stops kicking."""
194 self.log("Node.shutdown")
195 return self.stopService()
197 def setup_logging(self):
198 # we replace the formatTime() method of the log observer that twistd
199 # set up for us, with a method that uses better timestamps.
200 for o in log.theLogPublisher.observers:
201 # o might be a FileLogObserver's .emit method
202 if type(o) is type(self.setup_logging): # bound method
204 if isinstance(ob, log.FileLogObserver):
205 newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
206 ob.formatTime = newmeth
207 # TODO: twisted >2.5.0 offers maxRotatedFiles=50
209 self.tub.setOption("logport-furlfile",
210 os.path.join(self.basedir, "private","logport.furl"))
211 self.tub.setOption("log-gatherer-furlfile",
212 os.path.join(self.basedir, "log_gatherer.furl"))
213 self.tub.setOption("bridge-twisted-logs", True)
215 def log(self, *args, **kwargs):
216 return tahoe_log.msg(*args, **kwargs)
218 def old_log(self, msg, src="", args=(), **kw):
222 logsrc = self.logSource
225 msg = msg % tuple(map(humanreadable.hr, args))
227 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
228 msg = self.short_nodeid + ": " + humanreadable.hr(msg)
229 return log.callWithContext({"system":logsrc},
230 tahoe_log.msg, msg, **kw)
232 def _setup_tub(self, local_addresses):
233 # we can't get a dynamically-assigned portnum until our Tub is
234 # running, which means after startService.
235 l = self.tub.getListeners()[0]
236 portnum = l.getPortnum()
237 # record which port we're listening on, so we can grab the same one next time
238 open(self._portnumfile, "w").write("%d\n" % portnum)
240 local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
244 for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
245 mo = ADDR_RE.search(addrline)
247 (addr, dummy, aportnum,) = mo.groups()
250 addresses.append("%s:%d" % (addr, int(aportnum),))
251 except EnvironmentError:
254 addresses.extend(local_addresses)
256 location = ",".join(addresses)
257 self.log("Tub location set to %s" % location)
258 self.tub.setLocation(location)
261 def when_tub_ready(self):
262 return self._tub_ready_observerlist.when_fired()
264 def add_service(self, s):
265 s.setServiceParent(self)