2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
5 from twisted.python import log
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 from allmydata import get_package_versions_string
10 from allmydata.util import log as tahoe_log
11 from allmydata.util import iputil, observer, humanreadable
12 from allmydata.util.assertutil import precondition
14 # Just to get their versions:
15 import allmydata, pycryptopp, zfec
17 from foolscap.logging.publish import LogPublisher
18 # Add our application versions to the data that Foolscap's
19 # LogPublisher reports. Our __version__ attributes are actually
20 # instances of allmydata.util.version_class.Version, so convert them
22 LogPublisher.versions['allmydata'] = str(allmydata.__version__)
23 LogPublisher.versions['zfec'] = str(zfec.__version__)
24 LogPublisher.versions['pycryptopp'] = str(pycryptopp.__version__)
26 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
27 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
30 def formatTimeTahoeStyle(self, when):
31 # we want UTC timestamps that look like:
32 # 2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
33 d = datetime.datetime.utcfromtimestamp(when)
35 return d.isoformat(" ")[:-3]+"Z"
37 return d.isoformat(" ") + ".000Z"
39 class Node(service.MultiService):
40 # this implements common functionality of both Client nodes and Introducer
42 NODETYPE = "unknown NODETYPE"
45 LOCAL_IP_FILE = "advertised_ip_addresses"
47 def __init__(self, basedir="."):
48 service.MultiService.__init__(self)
49 self.basedir = os.path.abspath(basedir)
50 self._tub_ready_observerlist = observer.OneShotObserverList()
51 certfile = os.path.join(self.basedir, self.CERTFILE)
52 self.tub = Tub(certFile=certfile)
53 os.chmod(certfile, 0600)
54 self.tub.setOption("logLocalFailures", True)
55 self.tub.setOption("logRemoteFailures", True)
56 self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
57 self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
58 self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
59 assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
60 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
62 portnum = int(open(self._portnumfile, "rU").read())
63 except (EnvironmentError, ValueError):
65 self.tub.listenOn("tcp:%d" % portnum)
66 # we must wait until our service has started before we can find out
67 # our IP address and thus do tub.setLocation, and we can't register
68 # any services with the Tub until after that point
69 self.tub.setServiceParent(self)
72 AUTHKEYSFILEBASE = "authorized_keys."
73 for f in os.listdir(self.basedir):
74 if f.startswith(AUTHKEYSFILEBASE):
75 keyfile = os.path.join(self.basedir, f)
76 portnum = int(f[len(AUTHKEYSFILEBASE):])
77 from allmydata import manhole
78 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
79 m.setServiceParent(self)
80 self.log("AuthorizedKeysManhole listening on %d" % portnum)
83 self.log("Node constructed. " + get_package_versions_string())
84 iputil.increase_rlimits()
86 def get_config(self, name, mode="r", required=False):
87 """Get the (string) contents of a config file, or None if the file
88 did not exist. If required=True, raise an exception rather than
89 returning None. Any leading or trailing whitespace will be stripped
91 fn = os.path.join(self.basedir, name)
93 return open(fn, mode).read().strip()
94 except EnvironmentError:
99 def get_or_create_config(self, name, default_fn, mode="w", filemode=None):
100 """Try to get the (string) contents of a config file, and return it.
101 Any leading or trailing whitespace will be stripped from the data.
103 If the file does not exist, try to create it using default_fn, and
104 then return the value that was written. If 'default_fn' is a string,
105 use it as a default value. If not, treat it as a 0-argument callable
106 which is expected to return a string.
108 value = self.get_config(name)
110 if isinstance(default_fn, (str, unicode)):
114 fn = os.path.join(self.basedir, name)
119 if filemode is not None:
120 os.chmod(fn, filemode)
121 except EnvironmentError, e:
122 self.log("Unable to write config file '%s'" % fn)
124 value = value.strip()
127 def write_config(self, name, value, mode="w"):
128 """Write a string to a config file."""
129 fn = os.path.join(self.basedir, name)
131 open(fn, mode).write(value)
132 except EnvironmentError, e:
133 self.log("Unable to write config file '%s'" % fn)
136 def startService(self):
137 # Note: this class can be started and stopped at most once.
138 self.log("Node.startService")
139 # Delay until the reactor is running.
140 eventual.eventually(self._startService)
142 def _startService(self):
143 precondition(reactor.running)
144 self.log("Node._startService")
146 service.MultiService.startService(self)
147 d = defer.succeed(None)
148 d.addCallback(lambda res: iputil.get_local_addresses_async())
149 d.addCallback(self._setup_tub)
150 d.addCallback(lambda res: self.tub_ready())
152 self.log("%s running" % self.NODETYPE)
153 self._tub_ready_observerlist.fire(self)
155 d.addCallback(_ready)
157 self.log('_startService() failed')
159 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
160 self.log('calling os.abort()')
164 def stopService(self):
165 self.log("Node.stopService")
166 d = self._tub_ready_observerlist.when_fired()
167 def _really_stopService(ignored):
168 self.log("Node._really_stopService")
169 return service.MultiService.stopService(self)
170 d.addCallback(_really_stopService)
174 """Shut down the node. Returns a Deferred that fires (with None) when
175 it finally stops kicking."""
176 self.log("Node.shutdown")
177 return self.stopService()
179 def setup_logging(self):
180 # we replace the formatTime() method of the log observer that twistd
181 # set up for us, with a method that uses better timestamps.
182 for o in log.theLogPublisher.observers:
183 # o might be a FileLogObserver's .emit method
184 if type(o) is type(self.setup_logging): # bound method
186 if isinstance(ob, log.FileLogObserver):
187 newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
188 ob.formatTime = newmeth
189 # TODO: twisted >2.5.0 offers maxRotatedFiles=50
191 self.tub.setOption("logport-furlfile",
192 os.path.join(self.basedir, "logport.furl"))
193 self.tub.setOption("log-gatherer-furlfile",
194 os.path.join(self.basedir, "log_gatherer.furl"))
196 def log(self, msg, src="", args=(), **kw):
200 logsrc = self.logSource
203 msg = msg % tuple(map(humanreadable.hr, args))
205 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
206 msg = self.short_nodeid + ": " + humanreadable.hr(msg)
207 return log.callWithContext({"system":logsrc},
208 tahoe_log.msg, msg, **kw)
210 def _setup_tub(self, local_addresses):
211 # we can't get a dynamically-assigned portnum until our Tub is
212 # running, which means after startService.
213 l = self.tub.getListeners()[0]
214 portnum = l.getPortnum()
215 # record which port we're listening on, so we can grab the same one next time
216 open(self._portnumfile, "w").write("%d\n" % portnum)
218 local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
222 for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
223 mo = ADDR_RE.search(addrline)
225 (addr, dummy, aportnum,) = mo.groups()
228 addresses.append("%s:%d" % (addr, int(aportnum),))
229 except EnvironmentError:
232 addresses.extend(local_addresses)
234 location = ",".join(addresses)
235 self.log("Tub location set to %s" % location)
236 self.tub.setLocation(location)
240 # called when the Tub is available for registerReference
243 def when_tub_ready(self):
244 return self._tub_ready_observerlist.when_fired()
246 def add_service(self, s):
247 s.setServiceParent(self)