]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
hierarchical logging: add numbered messages and parent= args
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types, resource
3 from base64 import b32decode, b32encode
4
5 import twisted
6 from twisted.python import log
7 from twisted.application import service
8 from twisted.internet import defer, reactor
9 from foolscap import Tub, eventual
10 from allmydata.util import log as tahoe_log
11 from allmydata.util import iputil, observer, humanreadable
12 from allmydata.util.assertutil import precondition
13 from allmydata.logpublisher import LogPublisher
14
15 # Just to get their versions:
16 import allmydata, foolscap, pycryptopp, zfec
17
18 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
19 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
20
21
22 def formatTimeTahoeStyle(self, when):
23     # we want UTC timestamps that look like:
24     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
25     d = datetime.datetime.utcfromtimestamp(when)
26     if d.microsecond:
27         return d.isoformat(" ")[:-3]+"Z"
28     else:
29         return d.isoformat(" ") + ".000Z"
30
31 class Node(service.MultiService):
32     # this implements common functionality of both Client nodes and Introducer
33     # nodes.
34     NODETYPE = "unknown NODETYPE"
35     PORTNUMFILE = None
36     CERTFILE = "node.pem"
37     LOCAL_IP_FILE = "advertised_ip_addresses"
38
39     def __init__(self, basedir="."):
40         service.MultiService.__init__(self)
41         self.basedir = os.path.abspath(basedir)
42         self._tub_ready_observerlist = observer.OneShotObserverList()
43         certfile = os.path.join(self.basedir, self.CERTFILE)
44         self.tub = Tub(certFile=certfile)
45         os.chmod(certfile, 0600)
46         self.tub.setOption("logLocalFailures", True)
47         self.tub.setOption("logRemoteFailures", True)
48         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
49         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
50         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
51         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
52         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
53         try:
54             portnum = int(open(self._portnumfile, "rU").read())
55         except (EnvironmentError, ValueError):
56             portnum = 0
57         self.tub.listenOn("tcp:%d" % portnum)
58         # we must wait until our service has started before we can find out
59         # our IP address and thus do tub.setLocation, and we can't register
60         # any services with the Tub until after that point
61         self.tub.setServiceParent(self)
62         self.logSource="Node"
63
64         AUTHKEYSFILEBASE = "authorized_keys."
65         for f in os.listdir(self.basedir):
66             if f.startswith(AUTHKEYSFILEBASE):
67                 keyfile = os.path.join(self.basedir, f)
68                 portnum = int(f[len(AUTHKEYSFILEBASE):])
69                 from allmydata import manhole
70                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
71                 m.setServiceParent(self)
72                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
73
74         self.setup_logging()
75         self.log("Node constructed.  tahoe version: %s, foolscap: %s,"
76                  " twisted: %s, zfec: %s"
77                  % (allmydata.__version__, foolscap.__version__,
78                     twisted.__version__, zfec.__version__,))
79         self.increase_rlimits()
80
81     def increase_rlimits(self):
82         # We'd like to raise our soft resource.RLIMIT_NOFILE, since certain
83         # systems (OS-X, probably solaris) start with a relatively low limit
84         # (256), and some unit tests want to open up more sockets than this.
85         # Most linux systems start with both hard and soft limits at 1024,
86         # which is plenty.
87
88         # unfortunately the values to pass to setrlimit() vary widely from
89         # one system to another. OS-X reports (256, HUGE), but the real hard
90         # limit is 10240, and accepts (-1,-1) to mean raise it to the
91         # maximum. Cygwin reports (256, -1), then ignores a request of
92         # (-1,-1): instead you have to guess at the hard limit (it appears to
93         # be 3200), so using (3200,-1) seems to work. Linux reports a
94         # sensible (1024,1024), then rejects (-1,-1) as trying to raise the
95         # maximum limit, so you could set it to (1024,1024) but you might as
96         # well leave it alone.
97
98         try:
99             current = resource.getrlimit(resource.RLIMIT_NOFILE)
100         except AttributeError:
101             # we're probably missing RLIMIT_NOFILE, maybe this is windows
102             return
103
104         if current[0] >= 1024:
105             # good enough, leave it alone
106             return
107
108         try:
109             if current[1] > 0 and current[1] < 1000000:
110                 # solaris reports (256, 65536)
111                 resource.setrlimit(resource.RLIMIT_NOFILE,
112                                    (current[1], current[1]))
113             else:
114                 # this one works on OS-X (bsd), and gives us 10240, but
115                 # it doesn't work on linux (on which both the hard and
116                 # soft limits are set to 1024 by default).
117                 resource.setrlimit(resource.RLIMIT_NOFILE, (-1,-1))
118                 new = resource.getrlimit(resource.RLIMIT_NOFILE)
119                 if new[0] == current[0]:
120                     # probably cygwin, which ignores -1. Use a real value.
121                     resource.setrlimit(resource.RLIMIT_NOFILE, (3200,-1))
122
123         except ValueError:
124             self.log("unable to set RLIMIT_NOFILE: current value %s"
125                      % (resource.getrlimit(resource.RLIMIT_NOFILE),))
126         except:
127             # who knows what. It isn't very important, so log it and continue
128             log.err()
129
130
131     def get_config(self, name, mode="r", required=False):
132         """Get the (string) contents of a config file, or None if the file
133         did not exist. If required=True, raise an exception rather than
134         returning None. Any leading or trailing whitespace will be stripped
135         from the data."""
136         fn = os.path.join(self.basedir, name)
137         try:
138             return open(fn, mode).read().strip()
139         except EnvironmentError:
140             if not required:
141                 return None
142             raise
143
144     def get_or_create_config(self, name, default_fn, mode="w", filemode=None):
145         """Try to get the (string) contents of a config file, and return it.
146         Any leading or trailing whitespace will be stripped from the data.
147
148         If the file does not exist, try to create it using default_fn, and
149         then return the value that was written. If 'default_fn' is a string,
150         use it as a default value. If not, treat it as a 0-argument callable
151         which is expected to return a string.
152         """
153         value = self.get_config(name)
154         if value is None:
155             if isinstance(default_fn, (str, unicode)):
156                 value = default_fn
157             else:
158                 value = default_fn()
159             fn = os.path.join(self.basedir, name)
160             try:
161                 f = open(fn, mode)
162                 f.write(value)
163                 f.close()
164                 if filemode is not None:
165                     os.chmod(fn, filemode)
166             except EnvironmentError, e:
167                 self.log("Unable to write config file '%s'" % fn)
168                 self.log(e)
169             value = value.strip()
170         return value
171
172     def write_config(self, name, value, mode="w"):
173         """Write a string to a config file."""
174         fn = os.path.join(self.basedir, name)
175         try:
176             open(fn, mode).write(value)
177         except EnvironmentError, e:
178             self.log("Unable to write config file '%s'" % fn)
179             self.log(e)
180
181     def get_versions(self):
182         return {'allmydata': allmydata.__version__,
183                 'foolscap': foolscap.__version__,
184                 'twisted': twisted.__version__,
185                 'zfec': zfec.__version__,
186                 'pycryptopp': pycryptopp.__version__,
187                 }
188
189     def startService(self):
190         # Note: this class can be started and stopped at most once.
191         self.log("Node.startService")
192         # Delay until the reactor is running.
193         eventual.eventually(self._startService)
194
195     def _startService(self):
196         precondition(reactor.running)
197         self.log("Node._startService")
198
199         service.MultiService.startService(self)
200         d = defer.succeed(None)
201         d.addCallback(lambda res: iputil.get_local_addresses_async())
202         d.addCallback(self._setup_tub)
203         d.addCallback(lambda res: self.tub_ready())
204         def _ready(res):
205             self.log("%s running" % self.NODETYPE)
206             self._tub_ready_observerlist.fire(self)
207             return self
208         d.addCallback(_ready)
209         def _die(failure):
210             self.log('_startService() failed')
211             log.err(failure)
212             #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
213             self.log('calling os.abort()')
214             os.abort()
215         d.addErrback(_die)
216
217     def stopService(self):
218         self.log("Node.stopService")
219         d = self._tub_ready_observerlist.when_fired()
220         def _really_stopService(ignored):
221             self.log("Node._really_stopService")
222             return service.MultiService.stopService(self)
223         d.addCallback(_really_stopService)
224         return d
225
226     def shutdown(self):
227         """Shut down the node. Returns a Deferred that fires (with None) when
228         it finally stops kicking."""
229         self.log("Node.shutdown")
230         return self.stopService()
231
232     def setup_logging(self):
233         # we replace the formatTime() method of the log observer that twistd
234         # set up for us, with a method that uses better timestamps.
235         for o in log.theLogPublisher.observers:
236             # o might be a FileLogObserver's .emit method
237             if type(o) is type(self.setup_logging): # bound method
238                 ob = o.im_self
239                 if isinstance(ob, log.FileLogObserver):
240                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
241                     ob.formatTime = newmeth
242         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
243
244     def log(self, msg, src="", args=(), **kw):
245         if src:
246             logsrc = src
247         else:
248             logsrc = self.logSource
249         if args:
250             try:
251                 msg = msg % tuple(map(humanreadable.hr, args))
252             except TypeError, e:
253                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
254         msg = self.short_nodeid + ": " + humanreadable.hr(msg)
255         return log.callWithContext({"system":logsrc},
256                                    tahoe_log.msg, msg, **kw)
257
258     def _setup_tub(self, local_addresses):
259         # we can't get a dynamically-assigned portnum until our Tub is
260         # running, which means after startService.
261         l = self.tub.getListeners()[0]
262         portnum = l.getPortnum()
263         # record which port we're listening on, so we can grab the same one next time
264         open(self._portnumfile, "w").write("%d\n" % portnum)
265
266         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
267
268         addresses = []
269         try:
270             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
271                 mo = ADDR_RE.search(addrline)
272                 if mo:
273                     (addr, dummy, aportnum,) = mo.groups()
274                     if aportnum is None:
275                         aportnum = portnum
276                     addresses.append("%s:%d" % (addr, int(aportnum),))
277         except EnvironmentError:
278             pass
279
280         addresses.extend(local_addresses)
281
282         location = ",".join(addresses)
283         self.log("Tub location set to %s" % location)
284         self.tub.setLocation(location)
285         return self.tub
286
287     def tub_ready(self):
288         # called when the Tub is available for registerReference
289         self.add_service(LogPublisher())
290         log_gatherer_furl = self.get_config("log_gatherer.furl")
291         if log_gatherer_furl:
292             self.tub.connectTo(log_gatherer_furl, self._log_gatherer_connected)
293
294     def _log_gatherer_connected(self, rref):
295         rref.callRemote("logport",
296                         self.nodeid, self.getServiceNamed("log_publisher"))
297
298     def when_tub_ready(self):
299         return self._tub_ready_observerlist.when_fired()
300
301     def add_service(self, s):
302         s.setServiceParent(self)
303         return s
304