3 from base64 import b32decode, b32encode
6 from twisted.python import log
7 from twisted.application import service
8 from twisted.internet import defer, reactor
9 from foolscap import Tub, eventual
10 from allmydata.util import iputil, observer, humanreadable
11 from allmydata.util.assertutil import precondition
13 # Just to get their versions:
18 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
19 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
21 class Node(service.MultiService):
22 # this implements common functionality of both Client nodes, Introducer
23 # nodes, and Vdrive nodes
24 NODETYPE = "unknown NODETYPE"
27 LOCAL_IP_FILE = "advertised_ip_addresses"
29 def __init__(self, basedir="."):
30 service.MultiService.__init__(self)
31 self.basedir = os.path.abspath(basedir)
32 self._tub_ready_observerlist = observer.OneShotObserverList()
33 certfile = os.path.join(self.basedir, self.CERTFILE)
34 self.tub = Tub(certFile=certfile)
35 os.chmod(certfile, 0600)
36 self.tub.setOption("logLocalFailures", True)
37 self.tub.setOption("logRemoteFailures", True)
38 self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
39 self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
40 self.short_nodeid = b32encode(self.nodeid).lower()[:8] # ready for printing
41 assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
42 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
44 portnum = int(open(self._portnumfile, "rU").read())
45 except (EnvironmentError, ValueError):
47 self.tub.listenOn("tcp:%d" % portnum)
48 # we must wait until our service has started before we can find out
49 # our IP address and thus do tub.setLocation, and we can't register
50 # any services with the Tub until after that point
51 self.tub.setServiceParent(self)
54 AUTHKEYSFILEBASE = "authorized_keys."
55 for f in os.listdir(self.basedir):
56 if f.startswith(AUTHKEYSFILEBASE):
57 keyfile = os.path.join(self.basedir, f)
58 portnum = int(f[len(AUTHKEYSFILEBASE):])
59 from allmydata import manhole
60 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
61 m.setServiceParent(self)
62 self.log("AuthorizedKeysManhole listening on %d" % portnum)
64 self.log("Node constructed. tahoe version: %s, foolscap: %s,"
65 " twisted: %s, zfec: %s"
66 % (allmydata.__version__, foolscap.__version__,
67 twisted.__version__, zfec.__version__,))
69 def get_config(self, name, mode="r", required=False):
70 """Get the (string) contents of a config file, or None if the file
71 did not exist. If required=True, raise an exception rather than
72 returning None. Any leading or trailing whitespace will be stripped
74 fn = os.path.join(self.basedir, name)
76 return open(fn, mode).read().strip()
77 except EnvironmentError:
82 def get_or_create_config(self, name, default_fn, mode="w", filemode=None):
83 """Try to get the (string) contents of a config file, and return it.
84 Any leading or trailing whitespace will be stripped from the data.
86 If the file does not exist, try to create it using default_fn, and
87 then return the value that was written. If 'default_fn' is a string,
88 use it as a default value. If not, treat it as a 0-argument callable
89 which is expected to return a string.
91 value = self.get_config(name)
93 if isinstance(default_fn, (str, unicode)):
97 fn = os.path.join(self.basedir, name)
102 if filemode is not None:
103 os.chmod(fn, filemode)
104 except EnvironmentError, e:
105 self.log("Unable to write config file '%s'" % fn)
107 value = value.strip()
110 def write_config(self, name, value, mode="w"):
111 """Write a string to a config file."""
112 fn = os.path.join(self.basedir, name)
114 open(fn, mode).write(value)
115 except EnvironmentError, e:
116 self.log("Unable to write config file '%s'" % fn)
119 def get_versions(self):
120 return {'allmydata': allmydata.__version__,
121 'foolscap': foolscap.__version__,
122 'twisted': twisted.__version__,
123 'zfec': zfec.__version__,
126 def startService(self):
127 # note: this class can only be started and stopped once.
128 self.log("Node.startService")
129 eventual.eventually(self._startService)
131 def _startService(self):
132 precondition(reactor.running)
133 self.log("Node._startService")
135 service.MultiService.startService(self)
136 d = defer.succeed(None)
137 d.addCallback(lambda res: iputil.get_local_addresses_async())
138 d.addCallback(self._setup_tub)
139 d.addCallback(lambda res: self.tub_ready())
141 self.log("%s running" % self.NODETYPE)
142 self._tub_ready_observerlist.fire(self)
144 d.addCallback(_ready)
146 self.log('_startService() failed')
148 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
149 self.log('calling os.abort()')
153 def stopService(self):
154 self.log("Node.stopService")
155 d = self._tub_ready_observerlist.when_fired()
156 def _really_stopService(ignored):
157 self.log("Node._really_stopService")
158 return service.MultiService.stopService(self)
159 d.addCallback(_really_stopService)
163 """Shut down the node. Returns a Deferred that fires (with None) when
164 it finally stops kicking."""
165 self.log("Node.shutdown")
166 return self.stopService()
168 def log(self, msg, src="", args=()):
172 logsrc=self.logSource
175 msg = msg % tuple(map(humanreadable.hr, args))
177 msg = "ERROR: output string '%s' contained invalid %% expansion, error: %s, args: %s\n" % (`msg`, e, `args`)
178 # TODO: modify the timestamp to include milliseconds
179 # TODO: modify it to be in UTC instead of localtime
180 # (see twisted/python/log.py:FileLogObserver.formatTime line 362)
181 log.FileLogObserver.timeFormat="%Y-%m-%d %H:%M:%S"
182 log.callWithContext({"system":logsrc},log.msg,(self.short_nodeid + ": " + humanreadable.hr(msg)))
184 def _setup_tub(self, local_addresses):
185 # we can't get a dynamically-assigned portnum until our Tub is
186 # running, which means after startService.
187 l = self.tub.getListeners()[0]
188 portnum = l.getPortnum()
189 # record which port we're listening on, so we can grab the same one next time
190 open(self._portnumfile, "w").write("%d\n" % portnum)
192 local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
196 for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
197 mo = ADDR_RE.search(addrline)
199 (addr, dummy, aportnum,) = mo.groups()
202 addresses.append("%s:%d" % (addr, int(aportnum),))
203 except EnvironmentError:
206 addresses.extend(local_addresses)
208 location = ",".join(addresses)
209 self.log("Tub location set to %s" % location)
210 self.tub.setLocation(location)
214 # called when the Tub is available for registerReference
217 def when_tub_ready(self):
218 return self._tub_ready_observerlist.when_fired()
220 def add_service(self, s):
221 s.setServiceParent(self)