2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
5 from twisted.python import log
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 from allmydata import get_package_versions_string
10 from allmydata.util import log as tahoe_log
11 from allmydata.util import fileutil, iputil, observer, humanreadable
12 from allmydata.util.assertutil import precondition
14 # Just to get their versions:
15 import allmydata, pycryptopp, zfec
17 from foolscap.logging.publish import LogPublisher
18 # Add our application versions to the data that Foolscap's
19 # LogPublisher reports. Our __version__ attributes are actually
20 # instances of allmydata.util.version_class.Version, so convert them
22 LogPublisher.versions['allmydata'] = str(allmydata.__version__)
23 LogPublisher.versions['zfec'] = str(zfec.__version__)
24 LogPublisher.versions['pycryptopp'] = str(pycryptopp.__version__)
26 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
27 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
30 def formatTimeTahoeStyle(self, when):
31 # we want UTC timestamps that look like:
32 # 2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
33 d = datetime.datetime.utcfromtimestamp(when)
35 return d.isoformat(" ")[:-3]+"Z"
37 return d.isoformat(" ") + ".000Z"
40 This directory contains files which contain private data for the Tahoe node,
41 such as private keys. On Unix-like systems, the permissions on this directory
42 are set to disallow users other than its owner from reading the contents of
43 the files. See the 'configuration.txt' documentation file for details."""
45 class Node(service.MultiService):
46 # this implements common functionality of both Client nodes and Introducer
48 NODETYPE = "unknown NODETYPE"
51 LOCAL_IP_FILE = "advertised_ip_addresses"
53 def __init__(self, basedir="."):
54 service.MultiService.__init__(self)
55 self.basedir = os.path.abspath(basedir)
56 self._tub_ready_observerlist = observer.OneShotObserverList()
57 fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
58 open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
59 certfile = os.path.join(self.basedir, "private", self.CERTFILE)
60 self.tub = Tub(certFile=certfile)
61 self.tub.setOption("logLocalFailures", True)
62 self.tub.setOption("logRemoteFailures", True)
63 self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
64 self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
65 self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
66 assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
67 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
69 portnum = int(open(self._portnumfile, "rU").read())
70 except (EnvironmentError, ValueError):
72 self.tub.listenOn("tcp:%d" % portnum)
73 # we must wait until our service has started before we can find out
74 # our IP address and thus do tub.setLocation, and we can't register
75 # any services with the Tub until after that point
76 self.tub.setServiceParent(self)
79 AUTHKEYSFILEBASE = "authorized_keys."
80 for f in os.listdir(self.basedir):
81 if f.startswith(AUTHKEYSFILEBASE):
82 keyfile = os.path.join(self.basedir, f)
83 portnum = int(f[len(AUTHKEYSFILEBASE):])
84 from allmydata import manhole
85 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
86 m.setServiceParent(self)
87 self.log("AuthorizedKeysManhole listening on %d" % portnum)
90 self.log("Node constructed. " + get_package_versions_string())
91 iputil.increase_rlimits()
93 def get_config(self, name, required=False):
94 """Get the (string) contents of a config file, or None if the file
95 did not exist. If required=True, raise an exception rather than
96 returning None. Any leading or trailing whitespace will be stripped
98 fn = os.path.join(self.basedir, name)
100 return open(fn, "r").read().strip()
101 except EnvironmentError:
106 def write_private_config(self, name, value):
107 """Write the (string) contents of a private config file (which is a
108 config file that resides within the subdirectory named 'private'), and
109 return it. Any leading or trailing whitespace will be stripped from
112 privname = os.path.join(self.basedir, "private", name)
113 open(privname, "w").write(value.strip())
115 def get_or_create_private_config(self, name, default):
116 """Try to get the (string) contents of a private config file (which
117 is a config file that resides within the subdirectory named
118 'private'), and return it. Any leading or trailing whitespace will be
119 stripped from the data.
121 If the file does not exist, try to create it using default, and
122 then return the value that was written. If 'default' is a string,
123 use it as a default value. If not, treat it as a 0-argument callable
124 which is expected to return a string.
126 privname = os.path.join("private", name)
127 value = self.get_config(privname)
129 if isinstance(default, (str, unicode)):
133 fn = os.path.join(self.basedir, privname)
135 open(fn, "w").write(value)
136 except EnvironmentError, e:
137 self.log("Unable to write config file '%s'" % fn)
139 value = value.strip()
142 def write_config(self, name, value, mode="w"):
143 """Write a string to a config file."""
144 fn = os.path.join(self.basedir, name)
146 open(fn, mode).write(value)
147 except EnvironmentError, e:
148 self.log("Unable to write config file '%s'" % fn)
151 def startService(self):
152 # Note: this class can be started and stopped at most once.
153 self.log("Node.startService")
154 # Delay until the reactor is running.
155 eventual.eventually(self._startService)
157 def _startService(self):
158 precondition(reactor.running)
159 self.log("Node._startService")
161 service.MultiService.startService(self)
162 d = defer.succeed(None)
163 d.addCallback(lambda res: iputil.get_local_addresses_async())
164 d.addCallback(self._setup_tub)
165 d.addCallback(lambda res: self.tub_ready())
167 self.log("%s running" % self.NODETYPE)
168 self._tub_ready_observerlist.fire(self)
170 d.addCallback(_ready)
172 self.log('_startService() failed')
174 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
175 self.log('calling os.abort()')
179 def stopService(self):
180 self.log("Node.stopService")
181 d = self._tub_ready_observerlist.when_fired()
182 def _really_stopService(ignored):
183 self.log("Node._really_stopService")
184 return service.MultiService.stopService(self)
185 d.addCallback(_really_stopService)
189 """Shut down the node. Returns a Deferred that fires (with None) when
190 it finally stops kicking."""
191 self.log("Node.shutdown")
192 return self.stopService()
194 def setup_logging(self):
195 # we replace the formatTime() method of the log observer that twistd
196 # set up for us, with a method that uses better timestamps.
197 for o in log.theLogPublisher.observers:
198 # o might be a FileLogObserver's .emit method
199 if type(o) is type(self.setup_logging): # bound method
201 if isinstance(ob, log.FileLogObserver):
202 newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
203 ob.formatTime = newmeth
204 # TODO: twisted >2.5.0 offers maxRotatedFiles=50
206 self.tub.setOption("logport-furlfile",
207 os.path.join(self.basedir, "logport.furl"))
208 self.tub.setOption("log-gatherer-furlfile",
209 os.path.join(self.basedir, "log_gatherer.furl"))
211 def log(self, msg, src="", args=(), **kw):
215 logsrc = self.logSource
218 msg = msg % tuple(map(humanreadable.hr, args))
220 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
221 msg = self.short_nodeid + ": " + humanreadable.hr(msg)
222 return log.callWithContext({"system":logsrc},
223 tahoe_log.msg, msg, **kw)
225 def _setup_tub(self, local_addresses):
226 # we can't get a dynamically-assigned portnum until our Tub is
227 # running, which means after startService.
228 l = self.tub.getListeners()[0]
229 portnum = l.getPortnum()
230 # record which port we're listening on, so we can grab the same one next time
231 open(self._portnumfile, "w").write("%d\n" % portnum)
233 local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
237 for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
238 mo = ADDR_RE.search(addrline)
240 (addr, dummy, aportnum,) = mo.groups()
243 addresses.append("%s:%d" % (addr, int(aportnum),))
244 except EnvironmentError:
247 addresses.extend(local_addresses)
249 location = ",".join(addresses)
250 self.log("Tub location set to %s" % location)
251 self.tub.setLocation(location)
255 # called when the Tub is available for registerReference
258 def when_tub_ready(self):
259 return self._tub_ready_observerlist.when_fired()
261 def add_service(self, s):
262 s.setServiceParent(self)