2 import datetime, os.path, re, types, resource
3 from base64 import b32decode, b32encode
6 from twisted.python import log
7 from twisted.application import service
8 from twisted.internet import defer, reactor
9 from foolscap import Tub, eventual
10 from allmydata.util import log as tahoe_log
11 from allmydata.util import iputil, observer, humanreadable
12 from allmydata.util.assertutil import precondition
13 from allmydata.logpublisher import LogPublisher
15 # Just to get their versions:
16 import allmydata, foolscap, pycryptopp, zfec
18 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
19 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
22 def formatTimeTahoeStyle(self, when):
23 # we want UTC timestamps that look like:
24 # 2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
25 d = datetime.datetime.utcfromtimestamp(when)
27 return d.isoformat(" ")[:-3]+"Z"
29 return d.isoformat(" ") + ".000Z"
31 class Node(service.MultiService):
32 # this implements common functionality of both Client nodes and Introducer
34 NODETYPE = "unknown NODETYPE"
37 LOCAL_IP_FILE = "advertised_ip_addresses"
39 def __init__(self, basedir="."):
40 service.MultiService.__init__(self)
41 self.basedir = os.path.abspath(basedir)
42 self._tub_ready_observerlist = observer.OneShotObserverList()
43 certfile = os.path.join(self.basedir, self.CERTFILE)
44 self.tub = Tub(certFile=certfile)
45 os.chmod(certfile, 0600)
46 self.tub.setOption("logLocalFailures", True)
47 self.tub.setOption("logRemoteFailures", True)
48 self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
49 self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
50 self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
51 assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
52 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
54 portnum = int(open(self._portnumfile, "rU").read())
55 except (EnvironmentError, ValueError):
57 self.tub.listenOn("tcp:%d" % portnum)
58 # we must wait until our service has started before we can find out
59 # our IP address and thus do tub.setLocation, and we can't register
60 # any services with the Tub until after that point
61 self.tub.setServiceParent(self)
64 AUTHKEYSFILEBASE = "authorized_keys."
65 for f in os.listdir(self.basedir):
66 if f.startswith(AUTHKEYSFILEBASE):
67 keyfile = os.path.join(self.basedir, f)
68 portnum = int(f[len(AUTHKEYSFILEBASE):])
69 from allmydata import manhole
70 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
71 m.setServiceParent(self)
72 self.log("AuthorizedKeysManhole listening on %d" % portnum)
75 self.log("Node constructed. tahoe version: %s, foolscap: %s,"
76 " twisted: %s, zfec: %s"
77 % (allmydata.__version__, foolscap.__version__,
78 twisted.__version__, zfec.__version__,))
79 self.increase_rlimits()
81 def increase_rlimits(self):
82 # We'd like to raise our soft resource.RLIMIT_NOFILE, since certain
83 # systems (OS-X, probably solaris) start with a relatively low limit
84 # (256), and some unit tests want to open up more sockets than this.
85 # Most linux systems start with both hard and soft limits at 1024,
88 # unfortunately the values to pass to setrlimit() vary widely from
89 # one system to another. OS-X reports (256, HUGE), but the real hard
90 # limit is 10240, and accepts (-1,-1) to mean raise it to the
91 # maximum. Cygwin reports (256, -1), then ignores a request of
92 # (-1,-1): instead you have to guess at the hard limit (it appears to
93 # be 3200), so using (3200,-1) seems to work. Linux reports a
94 # sensible (1024,1024), then rejects (-1,-1) as trying to raise the
95 # maximum limit, so you could set it to (1024,1024) but you might as
96 # well leave it alone.
99 current = resource.getrlimit(resource.RLIMIT_NOFILE)
100 except AttributeError:
101 # we're probably missing RLIMIT_NOFILE, maybe this is windows
104 if current[0] >= 1024:
105 # good enough, leave it alone
109 if current[1] > 0 and current[1] < 1000000:
110 # solaris reports (256, 65536)
111 resource.setrlimit(resource.RLIMIT_NOFILE,
112 (current[1], current[1]))
114 # this one works on OS-X (bsd), and gives us 10240, but
115 # it doesn't work on linux (on which both the hard and
116 # soft limits are set to 1024 by default).
117 resource.setrlimit(resource.RLIMIT_NOFILE, (-1,-1))
118 new = resource.getrlimit(resource.RLIMIT_NOFILE)
119 if new[0] == current[0]:
120 # probably cygwin, which ignores -1. Use a real value.
121 resource.setrlimit(resource.RLIMIT_NOFILE, (3200,-1))
124 self.log("unable to set RLIMIT_NOFILE: current value %s"
125 % (resource.getrlimit(resource.RLIMIT_NOFILE),))
127 # who knows what. It isn't very important, so log it and continue
131 def get_config(self, name, mode="r", required=False):
132 """Get the (string) contents of a config file, or None if the file
133 did not exist. If required=True, raise an exception rather than
134 returning None. Any leading or trailing whitespace will be stripped
136 fn = os.path.join(self.basedir, name)
138 return open(fn, mode).read().strip()
139 except EnvironmentError:
144 def get_or_create_config(self, name, default_fn, mode="w", filemode=None):
145 """Try to get the (string) contents of a config file, and return it.
146 Any leading or trailing whitespace will be stripped from the data.
148 If the file does not exist, try to create it using default_fn, and
149 then return the value that was written. If 'default_fn' is a string,
150 use it as a default value. If not, treat it as a 0-argument callable
151 which is expected to return a string.
153 value = self.get_config(name)
155 if isinstance(default_fn, (str, unicode)):
159 fn = os.path.join(self.basedir, name)
164 if filemode is not None:
165 os.chmod(fn, filemode)
166 except EnvironmentError, e:
167 self.log("Unable to write config file '%s'" % fn)
169 value = value.strip()
172 def write_config(self, name, value, mode="w"):
173 """Write a string to a config file."""
174 fn = os.path.join(self.basedir, name)
176 open(fn, mode).write(value)
177 except EnvironmentError, e:
178 self.log("Unable to write config file '%s'" % fn)
181 def get_versions(self):
182 return {'allmydata': allmydata.__version__,
183 'foolscap': foolscap.__version__,
184 'twisted': twisted.__version__,
185 'zfec': zfec.__version__,
186 'pycryptopp': pycryptopp.__version__,
189 def startService(self):
190 # Note: this class can be started and stopped at most once.
191 self.log("Node.startService")
192 # Delay until the reactor is running.
193 eventual.eventually(self._startService)
195 def _startService(self):
196 precondition(reactor.running)
197 self.log("Node._startService")
199 service.MultiService.startService(self)
200 d = defer.succeed(None)
201 d.addCallback(lambda res: iputil.get_local_addresses_async())
202 d.addCallback(self._setup_tub)
203 d.addCallback(lambda res: self.tub_ready())
205 self.log("%s running" % self.NODETYPE)
206 self._tub_ready_observerlist.fire(self)
208 d.addCallback(_ready)
210 self.log('_startService() failed')
212 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
213 self.log('calling os.abort()')
217 def stopService(self):
218 self.log("Node.stopService")
219 d = self._tub_ready_observerlist.when_fired()
220 def _really_stopService(ignored):
221 self.log("Node._really_stopService")
222 return service.MultiService.stopService(self)
223 d.addCallback(_really_stopService)
227 """Shut down the node. Returns a Deferred that fires (with None) when
228 it finally stops kicking."""
229 self.log("Node.shutdown")
230 return self.stopService()
232 def setup_logging(self):
233 # we replace the formatTime() method of the log observer that twistd
234 # set up for us, with a method that uses better timestamps.
235 for o in log.theLogPublisher.observers:
236 # o might be a FileLogObserver's .emit method
237 if type(o) is type(self.setup_logging): # bound method
239 if isinstance(ob, log.FileLogObserver):
240 newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
241 ob.formatTime = newmeth
242 # TODO: twisted >2.5.0 offers maxRotatedFiles=50
244 def log(self, msg, src="", args=(), **kw):
248 logsrc = self.logSource
251 msg = msg % tuple(map(humanreadable.hr, args))
253 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
254 msg = self.short_nodeid + ": " + humanreadable.hr(msg)
255 return log.callWithContext({"system":logsrc},
256 tahoe_log.msg, msg, **kw)
258 def _setup_tub(self, local_addresses):
259 # we can't get a dynamically-assigned portnum until our Tub is
260 # running, which means after startService.
261 l = self.tub.getListeners()[0]
262 portnum = l.getPortnum()
263 # record which port we're listening on, so we can grab the same one next time
264 open(self._portnumfile, "w").write("%d\n" % portnum)
266 local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
270 for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
271 mo = ADDR_RE.search(addrline)
273 (addr, dummy, aportnum,) = mo.groups()
276 addresses.append("%s:%d" % (addr, int(aportnum),))
277 except EnvironmentError:
280 addresses.extend(local_addresses)
282 location = ",".join(addresses)
283 self.log("Tub location set to %s" % location)
284 self.tub.setLocation(location)
288 # called when the Tub is available for registerReference
289 self.add_service(LogPublisher())
290 log_gatherer_furl = self.get_config("log_gatherer.furl")
291 if log_gatherer_furl:
292 self.tub.connectTo(log_gatherer_furl, self._log_gatherer_connected)
294 def _log_gatherer_connected(self, rref):
295 rref.callRemote("logport",
296 self.nodeid, self.getServiceNamed("log_publisher"))
298 def when_tub_ready(self):
299 return self._tub_ready_observerlist.when_fired()
301 def add_service(self, s):
302 s.setServiceParent(self)