]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
remove logpublisher, use the Foolscap version now that this functionality has been...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
4
5 from twisted.python import log
6 from twisted.application import service
7 from twisted.internet import defer, reactor
8 from foolscap import Tub, eventual
9 from allmydata import get_package_versions_string
10 from allmydata.util import log as tahoe_log
11 from allmydata.util import iputil, observer, humanreadable
12 from allmydata.util.assertutil import precondition
13
14 # Just to get their versions:
15 import allmydata, pycryptopp, zfec
16
17 from foolscap.logging.publish import LogPublisher
18 # Add our application versions to the data that Foolscap's
19 # LogPublisher reports. Our __version__ attributes are actually
20 # instances of allmydata.util.version_class.Version, so convert them
21 # into strings first.
22 LogPublisher.versions['allmydata'] = str(allmydata.__version__)
23 LogPublisher.versions['zfec'] = str(zfec.__version__)
24 LogPublisher.versions['pycryptopp'] = str(pycryptopp.__version__)
25
26 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
27 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
28
29
30 def formatTimeTahoeStyle(self, when):
31     # we want UTC timestamps that look like:
32     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
33     d = datetime.datetime.utcfromtimestamp(when)
34     if d.microsecond:
35         return d.isoformat(" ")[:-3]+"Z"
36     else:
37         return d.isoformat(" ") + ".000Z"
38
39 class Node(service.MultiService):
40     # this implements common functionality of both Client nodes and Introducer
41     # nodes.
42     NODETYPE = "unknown NODETYPE"
43     PORTNUMFILE = None
44     CERTFILE = "node.pem"
45     LOCAL_IP_FILE = "advertised_ip_addresses"
46
47     def __init__(self, basedir="."):
48         service.MultiService.__init__(self)
49         self.basedir = os.path.abspath(basedir)
50         self._tub_ready_observerlist = observer.OneShotObserverList()
51         certfile = os.path.join(self.basedir, self.CERTFILE)
52         self.tub = Tub(certFile=certfile)
53         os.chmod(certfile, 0600)
54         self.tub.setOption("logLocalFailures", True)
55         self.tub.setOption("logRemoteFailures", True)
56         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
57         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
58         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
59         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
60         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
61         try:
62             portnum = int(open(self._portnumfile, "rU").read())
63         except (EnvironmentError, ValueError):
64             portnum = 0
65         self.tub.listenOn("tcp:%d" % portnum)
66         # we must wait until our service has started before we can find out
67         # our IP address and thus do tub.setLocation, and we can't register
68         # any services with the Tub until after that point
69         self.tub.setServiceParent(self)
70         self.logSource="Node"
71
72         AUTHKEYSFILEBASE = "authorized_keys."
73         for f in os.listdir(self.basedir):
74             if f.startswith(AUTHKEYSFILEBASE):
75                 keyfile = os.path.join(self.basedir, f)
76                 portnum = int(f[len(AUTHKEYSFILEBASE):])
77                 from allmydata import manhole
78                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
79                 m.setServiceParent(self)
80                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
81
82         self.setup_logging()
83         self.log("Node constructed. " + get_package_versions_string())
84         iputil.increase_rlimits()
85
86     def get_config(self, name, mode="r", required=False):
87         """Get the (string) contents of a config file, or None if the file
88         did not exist. If required=True, raise an exception rather than
89         returning None. Any leading or trailing whitespace will be stripped
90         from the data."""
91         fn = os.path.join(self.basedir, name)
92         try:
93             return open(fn, mode).read().strip()
94         except EnvironmentError:
95             if not required:
96                 return None
97             raise
98
99     def get_or_create_config(self, name, default_fn, mode="w", filemode=None):
100         """Try to get the (string) contents of a config file, and return it.
101         Any leading or trailing whitespace will be stripped from the data.
102
103         If the file does not exist, try to create it using default_fn, and
104         then return the value that was written. If 'default_fn' is a string,
105         use it as a default value. If not, treat it as a 0-argument callable
106         which is expected to return a string.
107         """
108         value = self.get_config(name)
109         if value is None:
110             if isinstance(default_fn, (str, unicode)):
111                 value = default_fn
112             else:
113                 value = default_fn()
114             fn = os.path.join(self.basedir, name)
115             try:
116                 f = open(fn, mode)
117                 f.write(value)
118                 f.close()
119                 if filemode is not None:
120                     os.chmod(fn, filemode)
121             except EnvironmentError, e:
122                 self.log("Unable to write config file '%s'" % fn)
123                 self.log(e)
124             value = value.strip()
125         return value
126
127     def write_config(self, name, value, mode="w"):
128         """Write a string to a config file."""
129         fn = os.path.join(self.basedir, name)
130         try:
131             open(fn, mode).write(value)
132         except EnvironmentError, e:
133             self.log("Unable to write config file '%s'" % fn)
134             self.log(e)
135
136     def startService(self):
137         # Note: this class can be started and stopped at most once.
138         self.log("Node.startService")
139         # Delay until the reactor is running.
140         eventual.eventually(self._startService)
141
142     def _startService(self):
143         precondition(reactor.running)
144         self.log("Node._startService")
145
146         service.MultiService.startService(self)
147         d = defer.succeed(None)
148         d.addCallback(lambda res: iputil.get_local_addresses_async())
149         d.addCallback(self._setup_tub)
150         d.addCallback(lambda res: self.tub_ready())
151         def _ready(res):
152             self.log("%s running" % self.NODETYPE)
153             self._tub_ready_observerlist.fire(self)
154             return self
155         d.addCallback(_ready)
156         def _die(failure):
157             self.log('_startService() failed')
158             log.err(failure)
159             #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
160             self.log('calling os.abort()')
161             os.abort()
162         d.addErrback(_die)
163
164     def stopService(self):
165         self.log("Node.stopService")
166         d = self._tub_ready_observerlist.when_fired()
167         def _really_stopService(ignored):
168             self.log("Node._really_stopService")
169             return service.MultiService.stopService(self)
170         d.addCallback(_really_stopService)
171         return d
172
173     def shutdown(self):
174         """Shut down the node. Returns a Deferred that fires (with None) when
175         it finally stops kicking."""
176         self.log("Node.shutdown")
177         return self.stopService()
178
179     def setup_logging(self):
180         # we replace the formatTime() method of the log observer that twistd
181         # set up for us, with a method that uses better timestamps.
182         for o in log.theLogPublisher.observers:
183             # o might be a FileLogObserver's .emit method
184             if type(o) is type(self.setup_logging): # bound method
185                 ob = o.im_self
186                 if isinstance(ob, log.FileLogObserver):
187                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
188                     ob.formatTime = newmeth
189         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
190
191         self.tub.setOption("logport-furlfile",
192                            os.path.join(self.basedir, "logport.furl"))
193         self.tub.setOption("log-gatherer-furlfile",
194                            os.path.join(self.basedir, "log_gatherer.furl"))
195
196     def log(self, msg, src="", args=(), **kw):
197         if src:
198             logsrc = src
199         else:
200             logsrc = self.logSource
201         if args:
202             try:
203                 msg = msg % tuple(map(humanreadable.hr, args))
204             except TypeError, e:
205                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
206         msg = self.short_nodeid + ": " + humanreadable.hr(msg)
207         return log.callWithContext({"system":logsrc},
208                                    tahoe_log.msg, msg, **kw)
209
210     def _setup_tub(self, local_addresses):
211         # we can't get a dynamically-assigned portnum until our Tub is
212         # running, which means after startService.
213         l = self.tub.getListeners()[0]
214         portnum = l.getPortnum()
215         # record which port we're listening on, so we can grab the same one next time
216         open(self._portnumfile, "w").write("%d\n" % portnum)
217
218         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
219
220         addresses = []
221         try:
222             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
223                 mo = ADDR_RE.search(addrline)
224                 if mo:
225                     (addr, dummy, aportnum,) = mo.groups()
226                     if aportnum is None:
227                         aportnum = portnum
228                     addresses.append("%s:%d" % (addr, int(aportnum),))
229         except EnvironmentError:
230             pass
231
232         addresses.extend(local_addresses)
233
234         location = ",".join(addresses)
235         self.log("Tub location set to %s" % location)
236         self.tub.setLocation(location)
237         return self.tub
238
239     def tub_ready(self):
240         # called when the Tub is available for registerReference
241         pass
242
243     def when_tub_ready(self):
244         return self._tub_ready_observerlist.when_fired()
245
246     def add_service(self, s):
247         s.setServiceParent(self)
248         return s
249