]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
decentralized directories: integration and testing
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, os.path, re, types
3 from base64 import b32decode, b32encode
4
5 import twisted
6 from twisted.python import log
7 from twisted.application import service
8 from twisted.internet import defer, reactor
9 from foolscap import Tub, eventual
10 from allmydata.util import iputil, observer, humanreadable
11 from allmydata.util.assertutil import precondition
12 from allmydata.logpublisher import LogPublisher
13
14 # Just to get their versions:
15 import allmydata, foolscap, pycryptopp, zfec
16
17 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
18 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
19
20
21 def formatTimeTahoeStyle(self, when):
22     # we want UTC timestamps that look like:
23     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
24     d = datetime.datetime.utcfromtimestamp(when)
25     if d.microsecond:
26         return d.isoformat(" ")[:-3]+"Z"
27     else:
28         return d.isoformat(" ") + ".000Z"
29
30 class Node(service.MultiService):
31     # this implements common functionality of both Client nodes and Introducer
32     # nodes.
33     NODETYPE = "unknown NODETYPE"
34     PORTNUMFILE = None
35     CERTFILE = "node.pem"
36     LOCAL_IP_FILE = "advertised_ip_addresses"
37
38     def __init__(self, basedir="."):
39         service.MultiService.__init__(self)
40         self.basedir = os.path.abspath(basedir)
41         self._tub_ready_observerlist = observer.OneShotObserverList()
42         certfile = os.path.join(self.basedir, self.CERTFILE)
43         self.tub = Tub(certFile=certfile)
44         os.chmod(certfile, 0600)
45         self.tub.setOption("logLocalFailures", True)
46         self.tub.setOption("logRemoteFailures", True)
47         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
48         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
49         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
50         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
51         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
52         try:
53             portnum = int(open(self._portnumfile, "rU").read())
54         except (EnvironmentError, ValueError):
55             portnum = 0
56         self.tub.listenOn("tcp:%d" % portnum)
57         # we must wait until our service has started before we can find out
58         # our IP address and thus do tub.setLocation, and we can't register
59         # any services with the Tub until after that point
60         self.tub.setServiceParent(self)
61         self.logSource="Node"
62
63         AUTHKEYSFILEBASE = "authorized_keys."
64         for f in os.listdir(self.basedir):
65             if f.startswith(AUTHKEYSFILEBASE):
66                 keyfile = os.path.join(self.basedir, f)
67                 portnum = int(f[len(AUTHKEYSFILEBASE):])
68                 from allmydata import manhole
69                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
70                 m.setServiceParent(self)
71                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
72
73         self.setup_logging()
74         self.log("Node constructed.  tahoe version: %s, foolscap: %s,"
75                  " twisted: %s, zfec: %s"
76                  % (allmydata.__version__, foolscap.__version__,
77                     twisted.__version__, zfec.__version__,))
78
79     def get_config(self, name, mode="r", required=False):
80         """Get the (string) contents of a config file, or None if the file
81         did not exist. If required=True, raise an exception rather than
82         returning None. Any leading or trailing whitespace will be stripped
83         from the data."""
84         fn = os.path.join(self.basedir, name)
85         try:
86             return open(fn, mode).read().strip()
87         except EnvironmentError:
88             if not required:
89                 return None
90             raise
91
92     def get_or_create_config(self, name, default_fn, mode="w", filemode=None):
93         """Try to get the (string) contents of a config file, and return it.
94         Any leading or trailing whitespace will be stripped from the data.
95
96         If the file does not exist, try to create it using default_fn, and
97         then return the value that was written. If 'default_fn' is a string,
98         use it as a default value. If not, treat it as a 0-argument callable
99         which is expected to return a string.
100         """
101         value = self.get_config(name)
102         if value is None:
103             if isinstance(default_fn, (str, unicode)):
104                 value = default_fn
105             else:
106                 value = default_fn()
107             fn = os.path.join(self.basedir, name)
108             try:
109                 f = open(fn, mode)
110                 f.write(value)
111                 f.close()
112                 if filemode is not None:
113                     os.chmod(fn, filemode)
114             except EnvironmentError, e:
115                 self.log("Unable to write config file '%s'" % fn)
116                 self.log(e)
117             value = value.strip()
118         return value
119
120     def write_config(self, name, value, mode="w"):
121         """Write a string to a config file."""
122         fn = os.path.join(self.basedir, name)
123         try:
124             open(fn, mode).write(value)
125         except EnvironmentError, e:
126             self.log("Unable to write config file '%s'" % fn)
127             self.log(e)
128
129     def get_versions(self):
130         return {'allmydata': allmydata.__version__,
131                 'foolscap': foolscap.__version__,
132                 'twisted': twisted.__version__,
133                 'zfec': zfec.__version__,
134                 'pycryptopp': pycryptopp.__version__,
135                 }
136
137     def startService(self):
138         # Note: this class can be started and stopped at most once.
139         self.log("Node.startService")
140         # Delay until the reactor is running.
141         eventual.eventually(self._startService)
142
143     def _startService(self):
144         precondition(reactor.running)
145         self.log("Node._startService")
146
147         service.MultiService.startService(self)
148         d = defer.succeed(None)
149         d.addCallback(lambda res: iputil.get_local_addresses_async())
150         d.addCallback(self._setup_tub)
151         d.addCallback(lambda res: self.tub_ready())
152         def _ready(res):
153             self.log("%s running" % self.NODETYPE)
154             self._tub_ready_observerlist.fire(self)
155             return self
156         d.addCallback(_ready)
157         def _die(failure):
158             self.log('_startService() failed')
159             log.err(failure)
160             #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
161             self.log('calling os.abort()')
162             os.abort()
163         d.addErrback(_die)
164
165     def stopService(self):
166         self.log("Node.stopService")
167         d = self._tub_ready_observerlist.when_fired()
168         def _really_stopService(ignored):
169             self.log("Node._really_stopService")
170             return service.MultiService.stopService(self)
171         d.addCallback(_really_stopService)
172         return d
173
174     def shutdown(self):
175         """Shut down the node. Returns a Deferred that fires (with None) when
176         it finally stops kicking."""
177         self.log("Node.shutdown")
178         return self.stopService()
179
180     def setup_logging(self):
181         # we replace the formatTime() method of the log observer that twistd
182         # set up for us, with a method that uses better timestamps.
183         for o in log.theLogPublisher.observers:
184             # o might be a FileLogObserver's .emit method
185             if type(o) is type(self.setup_logging): # bound method
186                 ob = o.im_self
187                 if isinstance(ob, log.FileLogObserver):
188                     newmeth = types.UnboundMethodType(formatTimeTahoeStyle, ob, ob.__class__)
189                     ob.formatTime = newmeth
190         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
191
192     def log(self, msg, src="", args=()):
193         if src:
194             logsrc = src
195         else:
196             logsrc = self.logSource
197         if args:
198             try:
199                 msg = msg % tuple(map(humanreadable.hr, args))
200             except TypeError, e:
201                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
202
203         log.callWithContext({"system":logsrc},
204                             log.msg,
205                             (self.short_nodeid + ": " + humanreadable.hr(msg)))
206
207     def _setup_tub(self, local_addresses):
208         # we can't get a dynamically-assigned portnum until our Tub is
209         # running, which means after startService.
210         l = self.tub.getListeners()[0]
211         portnum = l.getPortnum()
212         # record which port we're listening on, so we can grab the same one next time
213         open(self._portnumfile, "w").write("%d\n" % portnum)
214
215         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
216
217         addresses = []
218         try:
219             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
220                 mo = ADDR_RE.search(addrline)
221                 if mo:
222                     (addr, dummy, aportnum,) = mo.groups()
223                     if aportnum is None:
224                         aportnum = portnum
225                     addresses.append("%s:%d" % (addr, int(aportnum),))
226         except EnvironmentError:
227             pass
228
229         addresses.extend(local_addresses)
230
231         location = ",".join(addresses)
232         self.log("Tub location set to %s" % location)
233         self.tub.setLocation(location)
234         return self.tub
235
236     def tub_ready(self):
237         # called when the Tub is available for registerReference
238         self.add_service(LogPublisher())
239         log_gatherer_furl = self.get_config("log_gatherer.furl")
240         if log_gatherer_furl:
241             self.tub.connectTo(log_gatherer_furl, self._log_gatherer_connected)
242
243     def _log_gatherer_connected(self, rref):
244         rref.callRemote("logport",
245                         self.nodeid, self.getServiceNamed("log_publisher"))
246
247     def when_tub_ready(self):
248         return self._tub_ready_observerlist.when_fired()
249
250     def add_service(self, s):
251         s.setServiceParent(self)
252         return s
253