]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
trailing-whitespace eradication, no functional changes
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
4
5 import twisted
6 from twisted.python import log
7 from twisted.application import service
8 from twisted.internet import defer, reactor
9 from foolscap import Tub, eventual
10 from allmydata.util import iputil, observer, humanreadable
11 from allmydata.util.assertutil import precondition
12
13 # Just to get their versions:
14 import allmydata
15 import zfec
16 import foolscap
17
18 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
19 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
20
21
22 def formatTimeTahoeStyle(self, when):
23     # we want UTC timestamps that look like:
24     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
25     d = datetime.datetime.utcfromtimestamp(when)
26     if d.microsecond:
27         return d.isoformat(" ")[:-3]+"Z"
28     else:
29         return d.isoformat(" ") + ".000Z"
30
31 class Node(service.MultiService):
32     # this implements common functionality of both Client nodes, Introducer
33     # nodes, and Vdrive nodes
34     NODETYPE = "unknown NODETYPE"
35     PORTNUMFILE = None
36     CERTFILE = "node.pem"
37     LOCAL_IP_FILE = "advertised_ip_addresses"
38
39     def __init__(self, basedir="."):
40         service.MultiService.__init__(self)
41         self.basedir = os.path.abspath(basedir)
42         self._tub_ready_observerlist = observer.OneShotObserverList()
43         certfile = os.path.join(self.basedir, self.CERTFILE)
44         self.tub = Tub(certFile=certfile)
45         os.chmod(certfile, 0600)
46         self.tub.setOption("logLocalFailures", True)
47         self.tub.setOption("logRemoteFailures", True)
48         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
49         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
50         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
51         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
52         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
53         try:
54             portnum = int(open(self._portnumfile, "rU").read())
55         except (EnvironmentError, ValueError):
56             portnum = 0
57         self.tub.listenOn("tcp:%d" % portnum)
58         # we must wait until our service has started before we can find out
59         # our IP address and thus do tub.setLocation, and we can't register
60         # any services with the Tub until after that point
61         self.tub.setServiceParent(self)
62         self.logSource="Node"
63
64         AUTHKEYSFILEBASE = "authorized_keys."
65         for f in os.listdir(self.basedir):
66             if f.startswith(AUTHKEYSFILEBASE):
67                 keyfile = os.path.join(self.basedir, f)
68                 portnum = int(f[len(AUTHKEYSFILEBASE):])
69                 from allmydata import manhole
70                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
71                 m.setServiceParent(self)
72                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
73
74         self.setup_logging()
75         self.log("Node constructed.  tahoe version: %s, foolscap: %s,"
76                  " twisted: %s, zfec: %s"
77                  % (allmydata.__version__, foolscap.__version__,
78                     twisted.__version__, zfec.__version__,))
79
80     def get_config(self, name, mode="r", required=False):
81         """Get the (string) contents of a config file, or None if the file
82         did not exist. If required=True, raise an exception rather than
83         returning None. Any leading or trailing whitespace will be stripped
84         from the data."""
85         fn = os.path.join(self.basedir, name)
86         try:
87             return open(fn, mode).read().strip()
88         except EnvironmentError:
89             if not required:
90                 return None
91             raise
92
93     def get_or_create_config(self, name, default_fn, mode="w", filemode=None):
94         """Try to get the (string) contents of a config file, and return it.
95         Any leading or trailing whitespace will be stripped from the data.
96
97         If the file does not exist, try to create it using default_fn, and
98         then return the value that was written. If 'default_fn' is a string,
99         use it as a default value. If not, treat it as a 0-argument callable
100         which is expected to return a string.
101         """
102         value = self.get_config(name)
103         if value is None:
104             if isinstance(default_fn, (str, unicode)):
105                 value = default_fn
106             else:
107                 value = default_fn()
108             fn = os.path.join(self.basedir, name)
109             try:
110                 f = open(fn, mode)
111                 f.write(value)
112                 f.close()
113                 if filemode is not None:
114                     os.chmod(fn, filemode)
115             except EnvironmentError, e:
116                 self.log("Unable to write config file '%s'" % fn)
117                 self.log(e)
118             value = value.strip()
119         return value
120
121     def write_config(self, name, value, mode="w"):
122         """Write a string to a config file."""
123         fn = os.path.join(self.basedir, name)
124         try:
125             open(fn, mode).write(value)
126         except EnvironmentError, e:
127             self.log("Unable to write config file '%s'" % fn)
128             self.log(e)
129
130     def get_versions(self):
131         return {'allmydata': allmydata.__version__,
132                 'foolscap': foolscap.__version__,
133                 'twisted': twisted.__version__,
134                 'zfec': zfec.__version__,
135                 }
136
137     def startService(self):
138         # Note: this class can be started and stopped at most once.
139         self.log("Node.startService")
140         eventual.eventually(self._startService)
141
142     def _startService(self):
143         precondition(reactor.running)
144         self.log("Node._startService")
145
146         service.MultiService.startService(self)
147         d = defer.succeed(None)
148         d.addCallback(lambda res: iputil.get_local_addresses_async())
149         d.addCallback(self._setup_tub)
150         d.addCallback(lambda res: self.tub_ready())
151         def _ready(res):
152             self.log("%s running" % self.NODETYPE)
153             self._tub_ready_observerlist.fire(self)
154             return self
155         d.addCallback(_ready)
156         def _die(failure):
157             self.log('_startService() failed')
158             log.err(failure)
159             #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
160             self.log('calling os.abort()')
161             os.abort()
162         d.addErrback(_die)
163
164     def stopService(self):
165         self.log("Node.stopService")
166         d = self._tub_ready_observerlist.when_fired()
167         def _really_stopService(ignored):
168             self.log("Node._really_stopService")
169             return service.MultiService.stopService(self)
170         d.addCallback(_really_stopService)
171         return d
172
173     def shutdown(self):
174         """Shut down the node. Returns a Deferred that fires (with None) when
175         it finally stops kicking."""
176         self.log("Node.shutdown")
177         return self.stopService()
178
179     def setup_logging(self):
180         # we replace the formatTime() method of the log observer that twistd
181         # set up for us, with a method that uses better timestamps.
182         for o in log.theLogPublisher.observers:
183             # o might be a FileLogObserver's .emit method
184             if type(o) is type(self.setup_logging): # bound method
185                 ob = o.im_self
186                 if isinstance(ob, log.FileLogObserver):
187                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
188                     ob.formatTime = newmeth
189         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
190
191     def log(self, msg, src="", args=()):
192         if src:
193             logsrc = src
194         else:
195             logsrc = self.logSource
196         if args:
197             try:
198                 msg = msg % tuple(map(humanreadable.hr, args))
199             except TypeError, e:
200                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
201
202         log.callWithContext({"system":logsrc},
203                             log.msg,
204                             (self.short_nodeid + ": " + humanreadable.hr(msg)))
205
206     def _setup_tub(self, local_addresses):
207         # we can't get a dynamically-assigned portnum until our Tub is
208         # running, which means after startService.
209         l = self.tub.getListeners()[0]
210         portnum = l.getPortnum()
211         # record which port we're listening on, so we can grab the same one next time
212         open(self._portnumfile, "w").write("%d\n" % portnum)
213
214         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
215
216         addresses = []
217         try:
218             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
219                 mo = ADDR_RE.search(addrline)
220                 if mo:
221                     (addr, dummy, aportnum,) = mo.groups()
222                     if aportnum is None:
223                         aportnum = portnum
224                     addresses.append("%s:%d" % (addr, int(aportnum),))
225         except EnvironmentError:
226             pass
227
228         addresses.extend(local_addresses)
229
230         location = ",".join(addresses)
231         self.log("Tub location set to %s" % location)
232         self.tub.setLocation(location)
233         return self.tub
234
235     def tub_ready(self):
236         # called when the Tub is available for registerReference
237         pass
238
239     def when_tub_ready(self):
240         return self._tub_ready_observerlist.when_fired()
241
242     def add_service(self, s):
243         s.setServiceParent(self)
244         return s
245