1 from base64 import b32encode
6 from twisted.python import log
7 from twisted.application import service
8 from twisted.internet import defer, reactor
9 from foolscap import Tub, eventual
10 from allmydata.util import iputil, observer
11 from allmydata.util.assertutil import precondition
14 # Just to get their versions:
19 # group 1 will be addr (dotted quad string), group 3 if any will be portnum (string)
20 ADDR_RE=re.compile("^([1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*\.[1-9][0-9]*)(:([1-9][0-9]*))?$")
22 class Node(service.MultiService):
23 # this implements common functionality of both Client nodes, Introducer
24 # nodes, and Vdrive nodes
25 NODETYPE = "unknown NODETYPE"
28 LOCAL_IP_FILE = "advertised_ip_addresses"
29 NODEIDFILE = "my_nodeid"
31 def __init__(self, basedir="."):
32 service.MultiService.__init__(self)
33 self.basedir = os.path.abspath(basedir)
34 self._tub_ready_observerlist = observer.OneShotObserverList()
35 certfile = os.path.join(self.basedir, self.CERTFILE)
36 self.tub = Tub(certFile=certfile)
37 self.tub.setOption("logLocalFailures", True)
38 self.tub.setOption("logRemoteFailures", True)
39 # I think self.nodeid is kind of whacked. Shouldn't it equal the
40 # fingerprint portion of our furl?
41 self.nodeid = b32encode(self.tub.tubID).lower()
42 f = open(os.path.join(self.basedir, self.NODEIDFILE), "w")
43 f.write(b32encode(self.nodeid).lower() + "\n")
45 self.short_nodeid = self.tub.tubID[:4] # ready for printing
46 assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
47 self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
49 portnum = int(open(self._portnumfile, "rU").read())
50 except (EnvironmentError, ValueError):
52 self.tub.listenOn("tcp:%d" % portnum)
53 # we must wait until our service has started before we can find out
54 # our IP address and thus do tub.setLocation, and we can't register
55 # any services with the Tub until after that point
56 self.tub.setServiceParent(self)
58 AUTHKEYSFILEBASE = "authorized_keys."
59 for f in os.listdir(self.basedir):
60 if f.startswith(AUTHKEYSFILEBASE):
61 keyfile = os.path.join(self.basedir, f)
62 portnum = int(f[len(AUTHKEYSFILEBASE):])
63 from allmydata import manhole
64 m = manhole.AuthorizedKeysManhole(portnum, keyfile)
65 m.setServiceParent(self)
66 self.log("AuthorizedKeysManhole listening on %d" % portnum)
68 self.log("Node constructed. tahoe version: %s, foolscap: %s,"
69 " twisted: %s, zfec: %s"
70 % (allmydata.__version__, foolscap.__version__,
71 twisted.__version__, zfec.__version__,))
73 def get_versions(self):
74 return {'allmydata': allmydata.__version__,
75 'foolscap': foolscap.__version__,
76 'twisted': twisted.__version__,
77 'zfec': zfec.__version__,
80 def startService(self):
81 # note: this class can only be started and stopped once.
82 self.log("Node.startService")
83 eventual.eventually(self._startService)
85 def _startService(self):
86 precondition(reactor.running)
87 self.log("Node._startService")
89 service.MultiService.startService(self)
90 d = defer.succeed(None)
91 d.addCallback(lambda res: iputil.get_local_addresses_async())
92 d.addCallback(self._setup_tub)
93 d.addCallback(lambda res: self.tub_ready())
95 self.log("%s running" % self.NODETYPE)
96 self._tub_ready_observerlist.fire(self)
100 self.log('_startService() failed')
102 #reactor.stop() # for unknown reasons, reactor.stop() isn't working. [ ] TODO
103 self.log('calling os.abort()')
107 def stopService(self):
108 self.log("Node.stopService")
109 d = self._tub_ready_observerlist.when_fired()
110 def _really_stopService(ignored):
111 self.log("Node._really_stopService")
112 return service.MultiService.stopService(self)
113 d.addCallback(_really_stopService)
117 """Shut down the node. Returns a Deferred that fires (with None) when
118 it finally stops kicking."""
119 self.log("Node.shutdown")
120 return self.stopService()
123 log.msg(self.short_nodeid + ": " + msg)
125 def _setup_tub(self, local_addresses):
126 # we can't get a dynamically-assigned portnum until our Tub is
127 # running, which means after startService.
128 l = self.tub.getListeners()[0]
129 portnum = l.getPortnum()
130 # record which port we're listening on, so we can grab the same one next time
131 open(self._portnumfile, "w").write("%d\n" % portnum)
133 local_addresses = [ "%s:%d" % (addr, portnum,) for addr in local_addresses ]
137 for addrline in open(os.path.join(self.basedir, self.LOCAL_IP_FILE), "rU"):
138 mo = ADDR_RE.search(addrline)
140 (addr, dummy, aportnum,) = mo.groups()
143 addresses.append("%s:%d" % (addr, int(aportnum),))
144 except EnvironmentError:
147 addresses.extend(local_addresses)
149 location = ",".join(addresses)
150 self.log("Tub location set to %s" % location)
151 self.tub.setLocation(location)
155 # called when the Tub is available for registerReference
158 def when_tub_ready(self):
159 return self._tub_ready_observerlist.when_fired()
161 def add_service(self, s):
162 s.setServiceParent(self)