]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
node: change get_or_create_config to strip whitespace and accept a filemode= argument
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import os.path, re
3 from base64 import b32decode, b32encode
4
5 import twisted
6 from twisted.python import log
7 from twisted.application import service
8 from twisted.internet import defer, reactor
9 from foolscap import Tub, eventual
10 from allmydata.util import iputil, observer, humanreadable
11 from allmydata.util.assertutil import precondition
12
13 # Just to get their versions:
14 import allmydata
15 import zfec
16 import foolscap
17
18 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
19 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
20
21 class Node(service.MultiService):
22     # this implements common functionality of both Client nodes, Introducer 
23     # nodes, and Vdrive nodes
24     NODETYPE = "unknown NODETYPE"
25     PORTNUMFILE = None
26     CERTFILE = "node.pem"
27     LOCAL_IP_FILE = "advertised_ip_addresses"
28
29     def __init__(self, basedir="."):
30         service.MultiService.__init__(self)
31         self.basedir = os.path.abspath(basedir)
32         self._tub_ready_observerlist = observer.OneShotObserverList()
33         certfile = os.path.join(self.basedir, self.CERTFILE)
34         self.tub = Tub(certFile=certfile)
35         self.tub.setOption("logLocalFailures", True)
36         self.tub.setOption("logRemoteFailures", True)
37         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
38         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
39         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
40         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
41         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
42         try:
43             portnum = int(open(self._portnumfile, "rU").read())
44         except (EnvironmentError, ValueError):
45             portnum = 0
46         self.tub.listenOn("tcp:%d" % portnum)
47         # we must wait until our service has started before we can find out
48         # our IP address and thus do tub.setLocation, and we can't register
49         # any services with the Tub until after that point
50         self.tub.setServiceParent(self)
51         self.logSource="Node"
52
53         AUTHKEYSFILEBASE = "authorized_keys."
54         for f in os.listdir(self.basedir):
55             if f.startswith(AUTHKEYSFILEBASE):
56                 keyfile = os.path.join(self.basedir, f)
57                 portnum = int(f[len(AUTHKEYSFILEBASE):])
58                 from allmydata import manhole
59                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
60                 m.setServiceParent(self)
61                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
62
63         self.log("Node constructed.  tahoe version: %s, foolscap: %s,"
64                  " twisted: %s, zfec: %s"
65                  % (allmydata.__version__, foolscap.__version__,
66                     twisted.__version__, zfec.__version__,))
67
68     def get_config(self, name, mode="r", required=False):
69         """Get the (string) contents of a config file, or None if the file
70         did not exist. If required=True, raise an exception rather than
71         returning None. Any leading or trailing whitespace will be stripped
72         from the data."""
73         fn = os.path.join(self.basedir, name)
74         try:
75             return open(fn, mode).read().strip()
76         except EnvironmentError:
77             if not required:
78                 return None
79             raise
80
81     def get_or_create_config(self, name, default_fn, mode="w", filemode=None):
82         """Try to get the (string) contents of a config file, and return it.
83         Any leading or trailing whitespace will be stripped from the data.
84
85         If the file does not exist, try to create it using default_fn, and
86         then return the value that was written. If 'default_fn' is a string,
87         use it as a default value. If not, treat it as a 0-argument callable
88         which is expected to return a string.
89         """
90         value = self.get_config(name)
91         if value is None:
92             if isinstance(default_fn, (str, unicode)):
93                 value = default_fn
94             else:
95                 value = default_fn()
96             fn = os.path.join(self.basedir, name)
97             try:
98                 f = open(fn, mode)
99                 f.write(value)
100                 f.close()
101                 if filemode is not None:
102                     os.chmod(fn, filemode)
103             except EnvironmentError, e:
104                 self.log("Unable to write config file '%s'" % fn)
105                 self.log(e)
106             value = value.strip()
107         return value
108
109     def write_config(self, name, value, mode="w"):
110         """Write a string to a config file."""
111         fn = os.path.join(self.basedir, name)
112         try:
113             open(fn, mode).write(value)
114         except EnvironmentError, e:
115             self.log("Unable to write config file '%s'" % fn)
116             self.log(e)
117
118     def get_versions(self):
119         return {'allmydata': allmydata.__version__,
120                 'foolscap': foolscap.__version__,
121                 'twisted': twisted.__version__,
122                 'zfec': zfec.__version__,
123                 }
124
125     def startService(self):
126         # note: this class can only be started and stopped once.
127         self.log("Node.startService")
128         eventual.eventually(self._startService)
129
130     def _startService(self):
131         precondition(reactor.running)
132         self.log("Node._startService")
133
134         service.MultiService.startService(self)
135         d = defer.succeed(None)
136         d.addCallback(lambda res: iputil.get_local_addresses_async())
137         d.addCallback(self._setup_tub)
138         d.addCallback(lambda res: self.tub_ready())
139         def _ready(res):
140             self.log("%s running" % self.NODETYPE)
141             self._tub_ready_observerlist.fire(self)
142             return self
143         d.addCallback(_ready)
144         def _die(failure):
145             self.log('_startService() failed')
146             log.err(failure)
147             #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
148             self.log('calling os.abort()')
149             os.abort()
150         d.addErrback(_die)
151
152     def stopService(self):
153         self.log("Node.stopService")
154         d = self._tub_ready_observerlist.when_fired()
155         def _really_stopService(ignored):
156             self.log("Node._really_stopService")
157             return service.MultiService.stopService(self)
158         d.addCallback(_really_stopService)
159         return d
160
161     def shutdown(self):
162         """Shut down the node. Returns a Deferred that fires (with None) when
163         it finally stops kicking."""
164         self.log("Node.shutdown")
165         return self.stopService()
166
167     def log(self, msg, src="", args=()):
168         if src:
169             logsrc = src
170         else:
171             logsrc=self.logSource
172         if args:
173             try:
174                 msg = msg % tuple(map(humanreadable.hr, args))
175             except TypeError, e:
176                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
177         log.FileLogObserver.timeFormat="%y%m%d-%H:%M:%S"
178         log.callWithContext({"system":logsrc},log.msg,(self.short_nodeid + ": " + humanreadable.hr(msg)))
179
180     def _setup_tub(self, local_addresses):
181         # we can't get a dynamically-assigned portnum until our Tub is
182         # running, which means after startService.
183         l = self.tub.getListeners()[0]
184         portnum = l.getPortnum()
185         # record which port we're listening on, so we can grab the same one next time
186         open(self._portnumfile, "w").write("%d\n" % portnum)
187
188         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
189
190         addresses = []
191         try:
192             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
193                 mo = ADDR_RE.search(addrline)
194                 if mo:
195                     (addr, dummy, aportnum,) = mo.groups()
196                     if aportnum is None:
197                         aportnum = portnum
198                     addresses.append("%s:%d" % (addr, int(aportnum),))
199         except EnvironmentError:
200             pass
201
202         addresses.extend(local_addresses)
203
204         location = ",".join(addresses)
205         self.log("Tub location set to %s" % location)
206         self.tub.setLocation(location)
207         return self.tub
208
209     def tub_ready(self):
210         # called when the Tub is available for registerReference
211         pass
212
213     def when_tub_ready(self):
214         return self._tub_ready_observerlist.when_fired()
215
216     def add_service(self, s):
217         s.setServiceParent(self)
218         return s
219