From: Brian Warner Date: Sun, 3 Dec 2006 02:37:31 +0000 (-0700) Subject: rearrange node startup again, allowing Tub.registerReference to happen in startServic... X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~491 X-Git-Url: https://git.rkrishnan.org/(%5B%5E?a=commitdiff_plain;h=79c45f5f4d7d311e005054bec582158e93f00722;p=tahoe-lafs%2Ftahoe-lafs.git rearrange node startup again, allowing Tub.registerReference to happen in startService rather than in some later turn. Also add a 'local_ip' file with which you can override the published IP address of the node --- diff --git a/allmydata/client.py b/allmydata/client.py index df17ac72..641b2e43 100644 --- a/allmydata/client.py +++ b/allmydata/client.py @@ -7,12 +7,7 @@ from zope.interface import implements from allmydata.interfaces import RIClient from allmydata import node -from twisted.internet import defer, reactor -# this BlockingResolver is because otherwise unit tests must sometimes deal -# with a leftover DNS lookup thread. I'd prefer to not do this, and use the -# default ThreadedResolver -from twisted.internet.base import BlockingResolver -reactor.installResolver(BlockingResolver()) +from twisted.internet import defer from allmydata.storageserver import StorageServer from allmydata.upload import Uploader @@ -34,7 +29,10 @@ class Client(node.Node, Referenceable): self.add_service(Uploader()) self.queen_pburl = None self.queen_connector = None - self.my_pburl = None + + def tub_ready(self): + self.my_pburl = self.tub.registerReference(self) + self.maybe_connect_to_queen() def set_queen_pburl(self, queen_pburl): self.queen_pburl = queen_pburl @@ -53,10 +51,6 @@ class Client(node.Node, Referenceable): self.queen_connector = self.tub.connectTo(self.queen_pburl, self._got_queen) - def tub_ready(self, tub): - self.my_pburl = self.tub.registerReference(self) - self.maybe_connect_to_queen() - def stopService(self): if self.queen_connector: self.queen_connector.stopConnecting() diff --git a/allmydata/node.py b/allmydata/node.py index 201b7540..7a0d28ad 100644 --- a/allmydata/node.py +++ b/allmydata/node.py @@ -12,6 +12,7 @@ class Node(service.MultiService): NODETYPE = "unknown NODETYPE" PORTNUMFILE = None CERTFILE = None + LOCAL_IP_FILE = "local_ip" def __init__(self, basedir="."): service.MultiService.__init__(self) @@ -50,8 +51,15 @@ class Node(service.MultiService): log.msg("AuthorizedKeysManhole listening on %d" % portnum) def _setup_tub(self, local_ip): + # we can't get a dynamically-assigned portnum until our Tub is + # running, which means after startService. l = self.tub.getListeners()[0] portnum = l.getPortnum() + local_ip_filename = os.path.join(self.basedir, self.LOCAL_IP_FILE) + if os.path.exists(local_ip_filename): + f = open(local_ip_filename, "r") + local_ip = f.read() + f.close() self.tub.setLocation("%s:%d" % (local_ip, portnum)) if not os.path.exists(self._portnumfile): # record which port we're listening on, so we can grab the same @@ -62,8 +70,8 @@ class Node(service.MultiService): self.tub.setLocation("%s:%d" % (local_ip, l.getPortnum())) return self.tub - def tub_ready(self, tub): - # this is called when the Tub has a location + def tub_ready(self): + # called when the Tub is available for registerReference pass def add_service(self, s): @@ -73,8 +81,7 @@ class Node(service.MultiService): def startService(self): # note: this class can only be started and stopped once. service.MultiService.startService(self) - d = get_local_ip_for() - d.addCallback(self._setup_tub) - d.addCallback(self.tub_ready) - d.addCallback(lambda res: log.msg("%s running" % self.NODETYPE)) - + local_ip = get_local_ip_for() + self._setup_tub(local_ip) + self.tub_ready() + log.msg("%s running" % self.NODETYPE) diff --git a/allmydata/queen.py b/allmydata/queen.py index 5db05788..dfb299d0 100644 --- a/allmydata/queen.py +++ b/allmydata/queen.py @@ -1,4 +1,5 @@ +import os.path from foolscap import Referenceable from foolscap.eventual import eventually from twisted.application import service @@ -53,8 +54,12 @@ class Queen(node.Node): node.Node.__init__(self, basedir) self.urls = {} - def tub_ready(self, tub): + def tub_ready(self): r = self.add_service(Roster()) self.urls["roster"] = self.tub.registerReference(r, "roster") log.msg(" roster is at %s" % self.urls["roster"]) + f = open(os.path.join(self.basedir, "roster_pburl"), "w") + f.write(self.urls["roster"] + "\n") + f.close() + diff --git a/allmydata/test/test_system.py b/allmydata/test/test_system.py index 41a236a2..060ed5a7 100644 --- a/allmydata/test/test_system.py +++ b/allmydata/test/test_system.py @@ -1,21 +1,25 @@ from twisted.trial import unittest +from twisted.internet import defer, reactor from twisted.application import service from allmydata import upload, client, queen import os +from foolscap.eventual import flushEventualQueue class SystemTest(unittest.TestCase): def setUp(self): self.sparent = service.MultiService() self.sparent.startService() def tearDown(self): - return self.sparent.stopService() + d = self.sparent.stopService() + d.addCallback(lambda res: flushEventualQueue()) + return d def addService(self, s): s.setServiceParent(self.sparent) return s - def test_it(self): + def setUpNodes(self): os.mkdir("queen") q = self.addService(queen.Queen(basedir="queen")) clients = [] @@ -23,5 +27,17 @@ class SystemTest(unittest.TestCase): for i in range(NUMCLIENTS): basedir = "client%d" % i os.mkdir(basedir) - clients.append(self.addService(client.Client(basedir=basedir))) + c = self.addService(client.Client(basedir=basedir)) + clients.append(c) + + def waitForConnections(self): + # the cheap way: time + d = defer.Deferred() + reactor.callLater(1, d.callback, None) + return d + + def test_it(self): + self.setUpNodes() + d = self.waitForConnections() + return d diff --git a/allmydata/util/iputil.py b/allmydata/util/iputil.py index 602d5f1a..9b87b1c0 100644 --- a/allmydata/util/iputil.py +++ b/allmydata/util/iputil.py @@ -1,48 +1,21 @@ # adapted from nattraverso.ipdiscover +import socket from twisted.internet import reactor from twisted.internet.protocol import DatagramProtocol -#from twisted.internet.error import CannotListenError -#from twisted.internet.interfaces import IReactorMulticast -#from amdlib.util.nattraverso.utils import is_rfc1918_ip, is_bogus_ip def get_local_ip_for(target='A.ROOT-SERVERS.NET'): """Find out what our IP address is for use by a given target. - Returns a Deferred which will be fired with a string that holds the IP - address which could be used by 'target' to connect to us. It might work - for them, it might not. - - The reactor must be running before you can call this, because we must - perform a DNS lookup on the target. - - """ - d = reactor.resolve(target) - def _resolved(target_ipaddr): - udpprot = DatagramProtocol() - port = reactor.listenUDP(0, udpprot) - udpprot.transport.connect(target_ipaddr, 7) - localip = udpprot.transport.getHost().host - port.stopListening() - return localip - d.addCallback(_resolved) - return d - - - -def BROKEN_get_local_ip_for(target_ipaddr): - """Find out what our IP address is for use by a given target. - - Returns a Deferred which will be fired with a string that holds the IP - address which could be used by 'target' to connect to us. It might work - for them, it might not. 'target' must be an IP address. - + Returns a string that holds the IP address which could be used by + 'target' to connect to us. It might work for them, it might not. """ + target_ipaddr = socket.gethostbyname(target) udpprot = DatagramProtocol() port = reactor.listenUDP(0, udpprot) udpprot.transport.connect(target_ipaddr, 7) localip = udpprot.transport.getHost().host - port.stopListening() - + port.stopListening() # note, this returns a Deferred return localip +