]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/node.py
a slightly nicer method of computing our timestamp format
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / node.py
1
2 import datetime, new, os.path, re
3 from base64 import b32decode, b32encode
4
5 import twisted
6 from twisted.python import log, logfile
7 from twisted.application import service
8 from twisted.internet import defer, reactor
9 from foolscap import Tub, eventual
10 from allmydata.util import iputil, observer, humanreadable, fileutil
11 from allmydata.util.assertutil import precondition
12
13 # Just to get their versions:
14 import allmydata
15 import zfec
16 import foolscap
17
18 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
19 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
20
21
22 def formatTimeTahoeStyle(self, when):
23     # we want UTC timestamps that look like:
24     #  2007-10-12 00:26:28.566Z [Client] rnp752lz: 'client running'
25     d = datetime.datetime.utcfromtimestamp(when)
26     if d.microsecond:
27         return d.isoformat(" ")[:-3]+"Z"
28     else:
29         return d.isoformat(" ") + ".000Z"
30
31 class Node(service.MultiService):
32     # this implements common functionality of both Client nodes, Introducer 
33     # nodes, and Vdrive nodes
34     NODETYPE = "unknown NODETYPE"
35     PORTNUMFILE = None
36     CERTFILE = "node.pem"
37     LOCAL_IP_FILE = "advertised_ip_addresses"
38
39     def __init__(self, basedir="."):
40         service.MultiService.__init__(self)
41         self.basedir = os.path.abspath(basedir)
42         self._tub_ready_observerlist = observer.OneShotObserverList()
43         certfile = os.path.join(self.basedir, self.CERTFILE)
44         self.tub = Tub(certFile=certfile)
45         os.chmod(certfile, 0600)
46         self.tub.setOption("logLocalFailures", True)
47         self.tub.setOption("logRemoteFailures", True)
48         self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
49         self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
50         self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
51         assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
52         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
53         try:
54             portnum = int(open(self._portnumfile, "rU").read())
55         except (EnvironmentError, ValueError):
56             portnum = 0
57         self.tub.listenOn("tcp:%d" % portnum)
58         # we must wait until our service has started before we can find out
59         # our IP address and thus do tub.setLocation, and we can't register
60         # any services with the Tub until after that point
61         self.tub.setServiceParent(self)
62         self.logSource="Node"
63
64         AUTHKEYSFILEBASE = "authorized_keys."
65         for f in os.listdir(self.basedir):
66             if f.startswith(AUTHKEYSFILEBASE):
67                 keyfile = os.path.join(self.basedir, f)
68                 portnum = int(f[len(AUTHKEYSFILEBASE):])
69                 from allmydata import manhole
70                 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
71                 m.setServiceParent(self)
72                 self.log("AuthorizedKeysManhole listening on %d" % portnum)
73
74         self.setup_logging()
75         self.log("Node constructed.  tahoe version: %s, foolscap: %s,"
76                  " twisted: %s, zfec: %s"
77                  % (allmydata.__version__, foolscap.__version__,
78                     twisted.__version__, zfec.__version__,))
79
80     def get_config(self, name, mode="r", required=False):
81         """Get the (string) contents of a config file, or None if the file
82         did not exist. If required=True, raise an exception rather than
83         returning None. Any leading or trailing whitespace will be stripped
84         from the data."""
85         fn = os.path.join(self.basedir, name)
86         try:
87             return open(fn, mode).read().strip()
88         except EnvironmentError:
89             if not required:
90                 return None
91             raise
92
93     def get_or_create_config(self, name, default_fn, mode="w", filemode=None):
94         """Try to get the (string) contents of a config file, and return it.
95         Any leading or trailing whitespace will be stripped from the data.
96
97         If the file does not exist, try to create it using default_fn, and
98         then return the value that was written. If 'default_fn' is a string,
99         use it as a default value. If not, treat it as a 0-argument callable
100         which is expected to return a string.
101         """
102         value = self.get_config(name)
103         if value is None:
104             if isinstance(default_fn, (str, unicode)):
105                 value = default_fn
106             else:
107                 value = default_fn()
108             fn = os.path.join(self.basedir, name)
109             try:
110                 f = open(fn, mode)
111                 f.write(value)
112                 f.close()
113                 if filemode is not None:
114                     os.chmod(fn, filemode)
115             except EnvironmentError, e:
116                 self.log("Unable to write config file '%s'" % fn)
117                 self.log(e)
118             value = value.strip()
119         return value
120
121     def write_config(self, name, value, mode="w"):
122         """Write a string to a config file."""
123         fn = os.path.join(self.basedir, name)
124         try:
125             open(fn, mode).write(value)
126         except EnvironmentError, e:
127             self.log("Unable to write config file '%s'" % fn)
128             self.log(e)
129
130     def get_versions(self):
131         return {'allmydata': allmydata.__version__,
132                 'foolscap': foolscap.__version__,
133                 'twisted': twisted.__version__,
134                 'zfec': zfec.__version__,
135                 }
136
137     def startService(self):
138         # note: this class can only be started and stopped once.
139         self.log("Node.startService")
140         eventual.eventually(self._startService)
141
142     def _startService(self):
143         precondition(reactor.running)
144         self.log("Node._startService")
145
146         service.MultiService.startService(self)
147         d = defer.succeed(None)
148         d.addCallback(lambda res: iputil.get_local_addresses_async())
149         d.addCallback(self._setup_tub)
150         d.addCallback(lambda res: self.tub_ready())
151         def _ready(res):
152             self.log("%s running" % self.NODETYPE)
153             self._tub_ready_observerlist.fire(self)
154             return self
155         d.addCallback(_ready)
156         def _die(failure):
157             self.log('_startService() failed')
158             log.err(failure)
159             #reactor.stop() # for unknown reasons, reactor.stop() isn't working.  [ ] TODO
160             self.log('calling os.abort()')
161             os.abort()
162         d.addErrback(_die)
163
164     def stopService(self):
165         self.log("Node.stopService")
166         d = self._tub_ready_observerlist.when_fired()
167         def _really_stopService(ignored):
168             self.log("Node._really_stopService")
169             return service.MultiService.stopService(self)
170         d.addCallback(_really_stopService)
171         return d
172
173     def shutdown(self):
174         """Shut down the node. Returns a Deferred that fires (with None) when
175         it finally stops kicking."""
176         self.log("Node.shutdown")
177         return self.stopService()
178
179     def setup_logging(self):
180         # we replace the log observer that twistd set up for us, with a form
181         # that uses better timestamps. First, shut down all existing
182         # file-based observers (leaving trial's error-watching observers in
183         # place).
184         for o in log.theLogPublisher.observers:
185             # o might be a FileLogObserver's .emit method
186             if type(o) is type(self.setup_logging): # bound method
187                 ob = o.im_self
188                 if isinstance(ob, log.FileLogObserver):
189                     newmeth = new.instancemethod(formatTimeTahoeStyle, ob, ob.__class__)
190                     ob.formatTime = newmeth
191         # TODO: twisted >2.5.0 offers maxRotatedFiles=50
192
193     def log(self, msg, src="", args=()):
194         if src:
195             logsrc = src
196         else:
197             logsrc = self.logSource
198         if args:
199             try:
200                 msg = msg % tuple(map(humanreadable.hr, args))
201             except TypeError, e:
202                 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
203
204         log.callWithContext({"system":logsrc},
205                             log.msg,
206                             (self.short_nodeid + ": " + humanreadable.hr(msg)))
207
208     def _setup_tub(self, local_addresses):
209         # we can't get a dynamically-assigned portnum until our Tub is
210         # running, which means after startService.
211         l = self.tub.getListeners()[0]
212         portnum = l.getPortnum()
213         # record which port we're listening on, so we can grab the same one next time
214         open(self._portnumfile, "w").write("%d\n" % portnum)
215
216         local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
217
218         addresses = []
219         try:
220             for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
221                 mo = ADDR_RE.search(addrline)
222                 if mo:
223                     (addr, dummy, aportnum,) = mo.groups()
224                     if aportnum is None:
225                         aportnum = portnum
226                     addresses.append("%s:%d" % (addr, int(aportnum),))
227         except EnvironmentError:
228             pass
229
230         addresses.extend(local_addresses)
231
232         location = ",".join(addresses)
233         self.log("Tub location set to %s" % location)
234         self.tub.setLocation(location)
235         return self.tub
236
237     def tub_ready(self):
238         # called when the Tub is available for registerReference
239         pass
240
241     def when_tub_ready(self):
242         return self._tub_ready_observerlist.when_fired()
243
244     def add_service(self, s):
245         s.setServiceParent(self)
246         return s
247