From: Brian Warner Date: Fri, 25 Jul 2008 22:33:49 +0000 (-0700) Subject: test_system.py: factor SystemTestMixin out of SystemTest X-Git-Url: https://git.rkrishnan.org/pf/content/en/seg/index.php?a=commitdiff_plain;h=48c64b7a2951cf91ba4d21d1bd673abcb8a3351c;p=tahoe-lafs%2Ftahoe-lafs.git test_system.py: factor SystemTestMixin out of SystemTest --- diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index 852475b9..d8d17d54 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -4,7 +4,8 @@ from zope.interface import implements from twisted.internet import defer from twisted.python import failure from twisted.application import service -from foolscap.eventual import flushEventualQueue +from foolscap import Tub +from foolscap.eventual import flushEventualQueue, fireEventually from allmydata import uri, dirnode, client from allmydata.introducer.server import IntroducerNode from allmydata.interfaces import IURI, IMutableFileNode, IFileNode, \ @@ -12,6 +13,9 @@ from allmydata.interfaces import IURI, IMutableFileNode, IFileNode, \ from allmydata.immutable import checker from allmydata.immutable.encode import NotEnoughSharesError from allmydata.util import log, testutil, fileutil +from allmydata.stats import PickleStatsGatherer +from allmydata.key_generator import KeyGeneratorService + def flush_but_dont_ignore(res): d = flushEventualQueue() @@ -196,6 +200,12 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin, def setUp(self): self.sparent = service.MultiService() self.sparent.startService() + + self.stats_gatherer = None + self.stats_gatherer_furl = None + self.key_generator_svc = None + self.key_generator_furl = None + def tearDown(self): log.msg("shutting down SystemTest services") d = self.sparent.stopService() @@ -209,7 +219,8 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin, s.setServiceParent(self.sparent) return s - def set_up_nodes(self, NUMCLIENTS=5): + def set_up_nodes(self, NUMCLIENTS=5, + use_stats_gatherer=False, use_key_generator=False): self.numclients = NUMCLIENTS iv_dir = self.getdir("introducer") if not os.path.isdir(iv_dir): @@ -220,7 +231,51 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin, iv = IntroducerNode(basedir=iv_dir) self.introducer = self.add_service(iv) d = self.introducer.when_tub_ready() + d.addCallback(self._get_introducer_web) + if use_stats_gatherer: + d.addCallback(self._set_up_stats_gatherer) + if use_key_generator: + d.addCallback(self._set_up_key_generator) d.addCallback(self._set_up_nodes_2) + if use_stats_gatherer: + d.addCallback(self._grab_stats) + return d + + def _get_introducer_web(self, res): + f = open(os.path.join(self.getdir("introducer"), "node.url"), "r") + self.introweb_url = f.read().strip() + f.close() + + def _set_up_stats_gatherer(self, res): + statsdir = self.getdir("stats_gatherer") + fileutil.make_dirs(statsdir) + t = Tub() + self.add_service(t) + l = t.listenOn("tcp:0") + p = l.getPortnum() + t.setLocation("localhost:%d" % p) + + self.stats_gatherer = PickleStatsGatherer(t, statsdir, False) + self.add_service(self.stats_gatherer) + self.stats_gatherer_furl = self.stats_gatherer.get_furl() + + def _set_up_key_generator(self, res): + kgsdir = self.getdir("key_generator") + fileutil.make_dirs(kgsdir) + + self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False) + self.key_generator_svc.key_generator.pool_size = 4 + self.key_generator_svc.key_generator.pool_refresh_delay = 60 + self.add_service(self.key_generator_svc) + + d = fireEventually() + def check_for_furl(): + return os.path.exists(os.path.join(kgsdir, 'key_generator.furl')) + d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30)) + def get_furl(junk): + kgf = os.path.join(kgsdir, 'key_generator.furl') + self.key_generator_furl = file(kgf, 'rb').read().strip() + d.addCallback(get_furl) return d def _set_up_nodes_2(self, res): @@ -235,8 +290,17 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin, if i == 0: # client[0] runs a webserver and a helper, no key_generator open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1") + open(os.path.join(basedir, "run_helper"), "w").write("yes\n") open(os.path.join(basedir, "sizelimit"), "w").write("10GB\n") + if i == 3: + # client[3] runs a webserver and uses a helper, uses key_generator + open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1") + if self.key_generator_furl: + kgf = "%s\n" % (self.key_generator_furl,) + open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf) open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl) + if self.stats_gatherer_furl: + open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl) # start client[0], wait for it's tub to be ready (at which point it # will have registered the helper furl). @@ -244,6 +308,14 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin, self.clients.append(c) d = c.when_tub_ready() def _ready(res): + f = open(os.path.join(basedirs[0],"private","helper.furl"), "r") + helper_furl = f.read() + f.close() + self.helper_furl = helper_furl + f = open(os.path.join(basedirs[3],"helper.furl"), "w") + f.write(helper_furl) + f.close() + # this starts the rest of the clients for i in range(1, self.numclients): c = self.add_service(client.Client(basedir=basedirs[i])) @@ -257,9 +329,66 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin, l = self.clients[0].getServiceNamed("webish").listener port = l._port.getHost().port self.webish_url = "http://localhost:%d/" % port + # and the helper-using webport + l = self.clients[3].getServiceNamed("webish").listener + port = l._port.getHost().port + self.helper_webish_url = "http://localhost:%d/" % port d.addCallback(_connected) return d + def _grab_stats(self, res): + d = self.stats_gatherer.poll() + return d + + def bounce_client(self, num): + c = self.clients[num] + d = c.disownServiceParent() + # I think windows requires a moment to let the connection really stop + # and the port number made available for re-use. TODO: examine the + # behavior, see if this is really the problem, see if we can do + # better than blindly waiting for a second. + d.addCallback(self.stall, 1.0) + def _stopped(res): + new_c = client.Client(basedir=self.getdir("client%d" % num)) + self.clients[num] = new_c + self.add_service(new_c) + return new_c.when_tub_ready() + d.addCallback(_stopped) + d.addCallback(lambda res: self.wait_for_connections()) + def _maybe_get_webport(res): + if num == 0: + # now find out where the web port was + l = self.clients[0].getServiceNamed("webish").listener + port = l._port.getHost().port + self.webish_url = "http://localhost:%d/" % port + d.addCallback(_maybe_get_webport) + return d + + def add_extra_node(self, client_num, helper_furl=None, + add_to_sparent=False): + # usually this node is *not* parented to our self.sparent, so we can + # shut it down separately from the rest, to exercise the + # connection-lost code + basedir = self.getdir("client%d" % client_num) + if not os.path.isdir(basedir): + fileutil.make_dirs(basedir) + open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl) + if helper_furl: + f = open(os.path.join(basedir, "helper.furl") ,"w") + f.write(helper_furl+"\n") + f.close() + + c = client.Client(basedir=basedir) + self.clients.append(c) + self.numclients += 1 + if add_to_sparent: + c.setServiceParent(self.sparent) + else: + c.startService() + d = self.wait_for_connections() + d.addCallback(lambda res: c) + return d + def _check_connections(self): for c in self.clients: ic = c.introducer_client diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 397002e9..797826c6 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -6,26 +6,21 @@ from twisted.trial import unittest from twisted.internet import defer from twisted.internet import threads # CLI tests use deferToThread from twisted.internet.error import ConnectionDone, ConnectionLost -from twisted.application import service import allmydata -from allmydata import client, uri, storage, offloaded +from allmydata import uri, storage, offloaded from allmydata.immutable import download, upload, filenode -from allmydata.introducer.server import IntroducerNode -from allmydata.util import fileutil, idlib, mathutil, testutil +from allmydata.util import idlib, mathutil from allmydata.util import log, base32 from allmydata.scripts import runner, cli from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI from allmydata.mutable.common import NotMutableError from allmydata.mutable import layout as mutable_layout -from allmydata.stats import PickleStatsGatherer -from allmydata.key_generator import KeyGeneratorService -from foolscap.eventual import fireEventually -from foolscap import DeadReferenceError, Tub +from foolscap import DeadReferenceError from twisted.python.failure import Failure from twisted.web.client import getPage from twisted.web.error import Error -from allmydata.test.common import SystemTestMixin, flush_but_dont_ignore +from allmydata.test.common import SystemTestMixin LARGE_DATA = """ This is some data to publish to the virtual drive, which needs to be large @@ -46,215 +41,7 @@ class CountingDataUploadable(upload.Data): return upload.Data.read(self, length) -class SystemTest(testutil.SignalMixin, testutil.PollMixin, testutil.StallMixin, - unittest.TestCase): - - def setUp(self): - self.sparent = service.MultiService() - self.sparent.startService() - - self.stats_gatherer = None - self.stats_gatherer_furl = None - self.key_generator_svc = None - self.key_generator_furl = None - - def tearDown(self): - log.msg("shutting down SystemTest services") - d = self.sparent.stopService() - d.addBoth(flush_but_dont_ignore) - return d - - def getdir(self, subdir): - return os.path.join(self.basedir, subdir) - - def add_service(self, s): - s.setServiceParent(self.sparent) - return s - - def set_up_nodes(self, NUMCLIENTS=5, - use_stats_gatherer=False, use_key_generator=False): - self.numclients = NUMCLIENTS - iv_dir = self.getdir("introducer") - if not os.path.isdir(iv_dir): - fileutil.make_dirs(iv_dir) - f = open(os.path.join(iv_dir, "webport"), "w") - f.write("tcp:0:interface=127.0.0.1\n") - f.close() - iv = IntroducerNode(basedir=iv_dir) - self.introducer = self.add_service(iv) - d = self.introducer.when_tub_ready() - d.addCallback(self._get_introducer_web) - if use_stats_gatherer: - d.addCallback(self._set_up_stats_gatherer) - if use_key_generator: - d.addCallback(self._set_up_key_generator) - d.addCallback(self._set_up_nodes_2) - if use_stats_gatherer: - d.addCallback(self._grab_stats) - return d - - def _get_introducer_web(self, res): - f = open(os.path.join(self.getdir("introducer"), "node.url"), "r") - self.introweb_url = f.read().strip() - f.close() - - def _set_up_stats_gatherer(self, res): - statsdir = self.getdir("stats_gatherer") - fileutil.make_dirs(statsdir) - t = Tub() - self.add_service(t) - l = t.listenOn("tcp:0") - p = l.getPortnum() - t.setLocation("localhost:%d" % p) - - self.stats_gatherer = PickleStatsGatherer(t, statsdir, False) - self.add_service(self.stats_gatherer) - self.stats_gatherer_furl = self.stats_gatherer.get_furl() - - def _set_up_key_generator(self, res): - kgsdir = self.getdir("key_generator") - fileutil.make_dirs(kgsdir) - - self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False) - self.key_generator_svc.key_generator.pool_size = 4 - self.key_generator_svc.key_generator.pool_refresh_delay = 60 - self.add_service(self.key_generator_svc) - - d = fireEventually() - def check_for_furl(): - return os.path.exists(os.path.join(kgsdir, 'key_generator.furl')) - d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30)) - def get_furl(junk): - kgf = os.path.join(kgsdir, 'key_generator.furl') - self.key_generator_furl = file(kgf, 'rb').read().strip() - d.addCallback(get_furl) - return d - - def _set_up_nodes_2(self, res): - q = self.introducer - self.introducer_furl = q.introducer_url - self.clients = [] - basedirs = [] - for i in range(self.numclients): - basedir = self.getdir("client%d" % i) - basedirs.append(basedir) - fileutil.make_dirs(basedir) - if i == 0: - # client[0] runs a webserver and a helper, no key_generator - open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1") - open(os.path.join(basedir, "run_helper"), "w").write("yes\n") - open(os.path.join(basedir, "sizelimit"), "w").write("10GB\n") - if i == 3: - # client[3] runs a webserver and uses a helper, uses key_generator - open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1") - if self.key_generator_furl: - kgf = "%s\n" % (self.key_generator_furl,) - open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf) - open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl) - if self.stats_gatherer_furl: - open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl) - - # start client[0], wait for it's tub to be ready (at which point it - # will have registered the helper furl). - c = self.add_service(client.Client(basedir=basedirs[0])) - self.clients.append(c) - d = c.when_tub_ready() - def _ready(res): - f = open(os.path.join(basedirs[0],"private","helper.furl"), "r") - helper_furl = f.read() - f.close() - self.helper_furl = helper_furl - f = open(os.path.join(basedirs[3],"helper.furl"), "w") - f.write(helper_furl) - f.close() - - # this starts the rest of the clients - for i in range(1, self.numclients): - c = self.add_service(client.Client(basedir=basedirs[i])) - self.clients.append(c) - log.msg("STARTING") - return self.wait_for_connections() - d.addCallback(_ready) - def _connected(res): - log.msg("CONNECTED") - # now find out where the web port was - l = self.clients[0].getServiceNamed("webish").listener - port = l._port.getHost().port - self.webish_url = "http://localhost:%d/" % port - # and the helper-using webport - l = self.clients[3].getServiceNamed("webish").listener - port = l._port.getHost().port - self.helper_webish_url = "http://localhost:%d/" % port - d.addCallback(_connected) - return d - - def _grab_stats(self, res): - d = self.stats_gatherer.poll() - return d - - def bounce_client(self, num): - c = self.clients[num] - d = c.disownServiceParent() - # I think windows requires a moment to let the connection really stop - # and the port number made available for re-use. TODO: examine the - # behavior, see if this is really the problem, see if we can do - # better than blindly waiting for a second. - d.addCallback(self.stall, 1.0) - def _stopped(res): - new_c = client.Client(basedir=self.getdir("client%d" % num)) - self.clients[num] = new_c - self.add_service(new_c) - return new_c.when_tub_ready() - d.addCallback(_stopped) - d.addCallback(lambda res: self.wait_for_connections()) - def _maybe_get_webport(res): - if num == 0: - # now find out where the web port was - l = self.clients[0].getServiceNamed("webish").listener - port = l._port.getHost().port - self.webish_url = "http://localhost:%d/" % port - d.addCallback(_maybe_get_webport) - return d - - def add_extra_node(self, client_num, helper_furl=None, - add_to_sparent=False): - # usually this node is *not* parented to our self.sparent, so we can - # shut it down separately from the rest, to exercise the - # connection-lost code - basedir = self.getdir("client%d" % client_num) - if not os.path.isdir(basedir): - fileutil.make_dirs(basedir) - open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl) - if helper_furl: - f = open(os.path.join(basedir, "helper.furl") ,"w") - f.write(helper_furl+"\n") - f.close() - - c = client.Client(basedir=basedir) - self.clients.append(c) - self.numclients += 1 - if add_to_sparent: - c.setServiceParent(self.sparent) - else: - c.startService() - d = self.wait_for_connections() - d.addCallback(lambda res: c) - return d - - def _check_connections(self): - for c in self.clients: - ic = c.introducer_client - if not ic.connected_to_introducer(): - return False - if len(ic.get_all_peerids()) != self.numclients: - return False - return True - - def wait_for_connections(self, ignored=None): - # TODO: replace this with something that takes a list of peerids and - # fires when they've all been heard from, instead of using a count - # and a threshold - return self.poll(self._check_connections, timeout=200) +class SystemTest(SystemTestMixin, unittest.TestCase): def test_connections(self): self.basedir = "system/SystemTest/test_connections"