]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
node.py: don't append 'Z' to the timestamp, since it's really localtime. We need...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import os.path, re
3 from base64 import b32decode, b32encode
4
5 import twisted
6 from twisted.python import log
7 from twisted.application import service
8 from twisted.internet import defer, reactor
9 from foolscap import Tub, eventual
10 from allmydata.util import iputil, observer, humanreadable
11 from allmydata.util.assertutil import precondition
12
13 # Just to get their versions:
14 import allmydata
15 import zfec
16 import foolscap
17
18 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
19 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
20
21 class Node(service.MultiService):
22     # this implements common functionality of both Client nodes, Introducer 
23     # nodes, and Vdrive nodes
24     NODETYPE = "unknown NODETYPE"
25     PORTNUMFILE = None
26     CERTFILE = "node.pem"
27     LOCAL_IP_FILE = "advertised_ip_addresses"
28
29     def __init__(self, basedir="."):
30         service.MultiService.__init__(self)
31         self.basedir = os.path.abspath(basedir)
32         self._tub_ready_observerlist = observer.OneShotObserverList()
33         certfile = os.path.join(self.basedir, self.CERTFILE)
34         self.tub = Tub(certFile=certfile)
35         os.chmod(certfile, 0600)
36         self.tub.setOption("logLocalFailures", True)
37         self.tub.setOption("logRemoteFailures", True)
38         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
39         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
40         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
41         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
42         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
43         try:
44             portnum = int(open(self._portnumfile, "rU").read())
45         except (EnvironmentError, ValueError):
46             portnum = 0
47         self.tub.listenOn("tcp:%d" % portnum)
48         # we must wait until our service has started before we can find out
49         # our IP address and thus do tub.setLocation, and we can't register
50         # any services with the Tub until after that point
51         self.tub.setServiceParent(self)
52         self.logSource="Node"
53
54         AUTHKEYSFILEBASE = "authorized_keys."
55         for f in os.listdir(self.basedir):
56             if f.startswith(AUTHKEYSFILEBASE):
57                 keyfile = os.path.join(self.basedir, f)
58                 portnum = int(f[len(AUTHKEYSFILEBASE):])
59                 from allmydata import manhole
60                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
61                 m.setServiceParent(self)
62                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
63
64         self.log("Node constructed.  tahoe version: %s, foolscap: %s,"
65                  " twisted: %s, zfec: %s"
66                  % (allmydata.__version__, foolscap.__version__,
67                     twisted.__version__, zfec.__version__,))
68
69     def get_config(self, name, mode="r", required=False):
70         """Get the (string) contents of a config file, or None if the file
71         did not exist. If required=True, raise an exception rather than
72         returning None. Any leading or trailing whitespace will be stripped
73         from the data."""
74         fn = os.path.join(self.basedir, name)
75         try:
76             return open(fn, mode).read().strip()
77         except EnvironmentError:
78             if not required:
79                 return None
80             raise
81
82     def get_or_create_config(self, name, default_fn, mode="w", filemode=None):
83         """Try to get the (string) contents of a config file, and return it.
84         Any leading or trailing whitespace will be stripped from the data.
85
86         If the file does not exist, try to create it using default_fn, and
87         then return the value that was written. If 'default_fn' is a string,
88         use it as a default value. If not, treat it as a 0-argument callable
89         which is expected to return a string.
90         """
91         value = self.get_config(name)
92         if value is None:
93             if isinstance(default_fn, (str, unicode)):
94                 value = default_fn
95             else:
96                 value = default_fn()
97             fn = os.path.join(self.basedir, name)
98             try:
99                 f = open(fn, mode)
100                 f.write(value)
101                 f.close()
102                 if filemode is not None:
103                     os.chmod(fn, filemode)
104             except EnvironmentError, e:
105                 self.log("Unable to write config file '%s'" % fn)
106                 self.log(e)
107             value = value.strip()
108         return value
109
110     def write_config(self, name, value, mode="w"):
111         """Write a string to a config file."""
112         fn = os.path.join(self.basedir, name)
113         try:
114             open(fn, mode).write(value)
115         except EnvironmentError, e:
116             self.log("Unable to write config file '%s'" % fn)
117             self.log(e)
118
119     def get_versions(self):
120         return {'allmydata': allmydata.__version__,
121                 'foolscap': foolscap.__version__,
122                 'twisted': twisted.__version__,
123                 'zfec': zfec.__version__,
124                 }
125
126     def startService(self):
127         # note: this class can only be started and stopped once.
128         self.log("Node.startService")
129         eventual.eventually(self._startService)
130
131     def _startService(self):
132         precondition(reactor.running)
133         self.log("Node._startService")
134
135         service.MultiService.startService(self)
136         d = defer.succeed(None)
137         d.addCallback(lambda res: iputil.get_local_addresses_async())
138         d.addCallback(self._setup_tub)
139         d.addCallback(lambda res: self.tub_ready())
140         def _ready(res):
141             self.log("%s running" % self.NODETYPE)
142             self._tub_ready_observerlist.fire(self)
143             return self
144         d.addCallback(_ready)
145         def _die(failure):
146             self.log('_startService() failed')
147             log.err(failure)
148             #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
149             self.log('calling os.abort()')
150             os.abort()
151         d.addErrback(_die)
152
153     def stopService(self):
154         self.log("Node.stopService")
155         d = self._tub_ready_observerlist.when_fired()
156         def _really_stopService(ignored):
157             self.log("Node._really_stopService")
158             return service.MultiService.stopService(self)
159         d.addCallback(_really_stopService)
160         return d
161
162     def shutdown(self):
163         """Shut down the node. Returns a Deferred that fires (with None) when
164         it finally stops kicking."""
165         self.log("Node.shutdown")
166         return self.stopService()
167
168     def log(self, msg, src="", args=()):
169         if src:
170             logsrc = src
171         else:
172             logsrc=self.logSource
173         if args:
174             try:
175                 msg = msg % tuple(map(humanreadable.hr, args))
176             except TypeError, e:
177                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
178         # TODO: modify the timestamp to include milliseconds
179         # TODO: modify it to be in UTC instead of localtime
180         #  (see twisted/python/log.py:FileLogObserver.formatTime line 362)
181         log.FileLogObserver.timeFormat="%Y-%m-%d %H:%M:%S"
182         log.callWithContext({"system":logsrc},log.msg,(self.short_nodeid + ": " + humanreadable.hr(msg)))
183
184     def _setup_tub(self, local_addresses):
185         # we can't get a dynamically-assigned portnum until our Tub is
186         # running, which means after startService.
187         l = self.tub.getListeners()[0]
188         portnum = l.getPortnum()
189         # record which port we're listening on, so we can grab the same one next time
190         open(self._portnumfile, "w").write("%d\n" % portnum)
191
192         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
193
194         addresses = []
195         try:
196             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
197                 mo = ADDR_RE.search(addrline)
198                 if mo:
199                     (addr, dummy, aportnum,) = mo.groups()
200                     if aportnum is None:
201                         aportnum = portnum
202                     addresses.append("%s:%d" % (addr, int(aportnum),))
203         except EnvironmentError:
204             pass
205
206         addresses.extend(local_addresses)
207
208         location = ",".join(addresses)
209         self.log("Tub location set to %s" % location)
210         self.tub.setLocation(location)
211         return self.tub
212
213     def tub_ready(self):
214         # called when the Tub is available for registerReference
215         pass
216
217     def when_tub_ready(self):
218         return self._tub_ready_observerlist.when_fired()
219
220     def add_service(self, s):
221         s.setServiceParent(self)
222         return s
223