from twisted.internet import defer
from twisted.python import failure
from twisted.application import service
-from foolscap.eventual import flushEventualQueue
+from foolscap import Tub
+from foolscap.eventual import flushEventualQueue, fireEventually
from allmydata import uri, dirnode, client
from allmydata.introducer.server import IntroducerNode
from allmydata.interfaces import IURI, IMutableFileNode, IFileNode, \
from allmydata.immutable import checker
from allmydata.immutable.encode import NotEnoughSharesError
from allmydata.util import log, testutil, fileutil
+from allmydata.stats import PickleStatsGatherer
+from allmydata.key_generator import KeyGeneratorService
+
def flush_but_dont_ignore(res):
d = flushEventualQueue()
def setUp(self):
self.sparent = service.MultiService()
self.sparent.startService()
+
+ self.stats_gatherer = None
+ self.stats_gatherer_furl = None
+ self.key_generator_svc = None
+ self.key_generator_furl = None
+
def tearDown(self):
log.msg("shutting down SystemTest services")
d = self.sparent.stopService()
s.setServiceParent(self.sparent)
return s
- def set_up_nodes(self, NUMCLIENTS=5):
+ def set_up_nodes(self, NUMCLIENTS=5,
+ use_stats_gatherer=False, use_key_generator=False):
self.numclients = NUMCLIENTS
iv_dir = self.getdir("introducer")
if not os.path.isdir(iv_dir):
iv = IntroducerNode(basedir=iv_dir)
self.introducer = self.add_service(iv)
d = self.introducer.when_tub_ready()
+ d.addCallback(self._get_introducer_web)
+ if use_stats_gatherer:
+ d.addCallback(self._set_up_stats_gatherer)
+ if use_key_generator:
+ d.addCallback(self._set_up_key_generator)
d.addCallback(self._set_up_nodes_2)
+ if use_stats_gatherer:
+ d.addCallback(self._grab_stats)
+ return d
+
+ def _get_introducer_web(self, res):
+ f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
+ self.introweb_url = f.read().strip()
+ f.close()
+
+ def _set_up_stats_gatherer(self, res):
+ statsdir = self.getdir("stats_gatherer")
+ fileutil.make_dirs(statsdir)
+ t = Tub()
+ self.add_service(t)
+ l = t.listenOn("tcp:0")
+ p = l.getPortnum()
+ t.setLocation("localhost:%d" % p)
+
+ self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
+ self.add_service(self.stats_gatherer)
+ self.stats_gatherer_furl = self.stats_gatherer.get_furl()
+
+ def _set_up_key_generator(self, res):
+ kgsdir = self.getdir("key_generator")
+ fileutil.make_dirs(kgsdir)
+
+ self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
+ self.key_generator_svc.key_generator.pool_size = 4
+ self.key_generator_svc.key_generator.pool_refresh_delay = 60
+ self.add_service(self.key_generator_svc)
+
+ d = fireEventually()
+ def check_for_furl():
+ return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
+ d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
+ def get_furl(junk):
+ kgf = os.path.join(kgsdir, 'key_generator.furl')
+ self.key_generator_furl = file(kgf, 'rb').read().strip()
+ d.addCallback(get_furl)
return d
def _set_up_nodes_2(self, res):
if i == 0:
# client[0] runs a webserver and a helper, no key_generator
open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
+ open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
open(os.path.join(basedir, "sizelimit"), "w").write("10GB\n")
+ if i == 3:
+ # client[3] runs a webserver and uses a helper, uses key_generator
+ open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
+ if self.key_generator_furl:
+ kgf = "%s\n" % (self.key_generator_furl,)
+ open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
+ if self.stats_gatherer_furl:
+ open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
# start client[0], wait for it's tub to be ready (at which point it
# will have registered the helper furl).
self.clients.append(c)
d = c.when_tub_ready()
def _ready(res):
+ f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
+ helper_furl = f.read()
+ f.close()
+ self.helper_furl = helper_furl
+ f = open(os.path.join(basedirs[3],"helper.furl"), "w")
+ f.write(helper_furl)
+ f.close()
+
# this starts the rest of the clients
for i in range(1, self.numclients):
c = self.add_service(client.Client(basedir=basedirs[i]))
l = self.clients[0].getServiceNamed("webish").listener
port = l._port.getHost().port
self.webish_url = "http://localhost:%d/" % port
+ # and the helper-using webport
+ l = self.clients[3].getServiceNamed("webish").listener
+ port = l._port.getHost().port
+ self.helper_webish_url = "http://localhost:%d/" % port
d.addCallback(_connected)
return d
+ def _grab_stats(self, res):
+ d = self.stats_gatherer.poll()
+ return d
+
+ def bounce_client(self, num):
+ c = self.clients[num]
+ d = c.disownServiceParent()
+ # I think windows requires a moment to let the connection really stop
+ # and the port number made available for re-use. TODO: examine the
+ # behavior, see if this is really the problem, see if we can do
+ # better than blindly waiting for a second.
+ d.addCallback(self.stall, 1.0)
+ def _stopped(res):
+ new_c = client.Client(basedir=self.getdir("client%d" % num))
+ self.clients[num] = new_c
+ self.add_service(new_c)
+ return new_c.when_tub_ready()
+ d.addCallback(_stopped)
+ d.addCallback(lambda res: self.wait_for_connections())
+ def _maybe_get_webport(res):
+ if num == 0:
+ # now find out where the web port was
+ l = self.clients[0].getServiceNamed("webish").listener
+ port = l._port.getHost().port
+ self.webish_url = "http://localhost:%d/" % port
+ d.addCallback(_maybe_get_webport)
+ return d
+
+ def add_extra_node(self, client_num, helper_furl=None,
+ add_to_sparent=False):
+ # usually this node is *not* parented to our self.sparent, so we can
+ # shut it down separately from the rest, to exercise the
+ # connection-lost code
+ basedir = self.getdir("client%d" % client_num)
+ if not os.path.isdir(basedir):
+ fileutil.make_dirs(basedir)
+ open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
+ if helper_furl:
+ f = open(os.path.join(basedir, "helper.furl") ,"w")
+ f.write(helper_furl+"\n")
+ f.close()
+
+ c = client.Client(basedir=basedir)
+ self.clients.append(c)
+ self.numclients += 1
+ if add_to_sparent:
+ c.setServiceParent(self.sparent)
+ else:
+ c.startService()
+ d = self.wait_for_connections()
+ d.addCallback(lambda res: c)
+ return d
+
def _check_connections(self):
for c in self.clients:
ic = c.introducer_client
from twisted.internet import defer
from twisted.internet import threads # CLI tests use deferToThread
from twisted.internet.error import ConnectionDone, ConnectionLost
-from twisted.application import service
import allmydata
-from allmydata import client, uri, storage, offloaded
+from allmydata import uri, storage, offloaded
from allmydata.immutable import download, upload, filenode
-from allmydata.introducer.server import IntroducerNode
-from allmydata.util import fileutil, idlib, mathutil, testutil
+from allmydata.util import idlib, mathutil
from allmydata.util import log, base32
from allmydata.scripts import runner, cli
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
from allmydata.mutable.common import NotMutableError
from allmydata.mutable import layout as mutable_layout
-from allmydata.stats import PickleStatsGatherer
-from allmydata.key_generator import KeyGeneratorService
-from foolscap.eventual import fireEventually
-from foolscap import DeadReferenceError, Tub
+from foolscap import DeadReferenceError
from twisted.python.failure import Failure
from twisted.web.client import getPage
from twisted.web.error import Error
-from allmydata.test.common import SystemTestMixin, flush_but_dont_ignore
+from allmydata.test.common import SystemTestMixin
LARGE_DATA = """
This is some data to publish to the virtual drive, which needs to be large
return upload.Data.read(self, length)
-class SystemTest(testutil.SignalMixin, testutil.PollMixin, testutil.StallMixin,
- unittest.TestCase):
-
- def setUp(self):
- self.sparent = service.MultiService()
- self.sparent.startService()
-
- self.stats_gatherer = None
- self.stats_gatherer_furl = None
- self.key_generator_svc = None
- self.key_generator_furl = None
-
- def tearDown(self):
- log.msg("shutting down SystemTest services")
- d = self.sparent.stopService()
- d.addBoth(flush_but_dont_ignore)
- return d
-
- def getdir(self, subdir):
- return os.path.join(self.basedir, subdir)
-
- def add_service(self, s):
- s.setServiceParent(self.sparent)
- return s
-
- def set_up_nodes(self, NUMCLIENTS=5,
- use_stats_gatherer=False, use_key_generator=False):
- self.numclients = NUMCLIENTS
- iv_dir = self.getdir("introducer")
- if not os.path.isdir(iv_dir):
- fileutil.make_dirs(iv_dir)
- f = open(os.path.join(iv_dir, "webport"), "w")
- f.write("tcp:0:interface=127.0.0.1\n")
- f.close()
- iv = IntroducerNode(basedir=iv_dir)
- self.introducer = self.add_service(iv)
- d = self.introducer.when_tub_ready()
- d.addCallback(self._get_introducer_web)
- if use_stats_gatherer:
- d.addCallback(self._set_up_stats_gatherer)
- if use_key_generator:
- d.addCallback(self._set_up_key_generator)
- d.addCallback(self._set_up_nodes_2)
- if use_stats_gatherer:
- d.addCallback(self._grab_stats)
- return d
-
- def _get_introducer_web(self, res):
- f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
- self.introweb_url = f.read().strip()
- f.close()
-
- def _set_up_stats_gatherer(self, res):
- statsdir = self.getdir("stats_gatherer")
- fileutil.make_dirs(statsdir)
- t = Tub()
- self.add_service(t)
- l = t.listenOn("tcp:0")
- p = l.getPortnum()
- t.setLocation("localhost:%d" % p)
-
- self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
- self.add_service(self.stats_gatherer)
- self.stats_gatherer_furl = self.stats_gatherer.get_furl()
-
- def _set_up_key_generator(self, res):
- kgsdir = self.getdir("key_generator")
- fileutil.make_dirs(kgsdir)
-
- self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
- self.key_generator_svc.key_generator.pool_size = 4
- self.key_generator_svc.key_generator.pool_refresh_delay = 60
- self.add_service(self.key_generator_svc)
-
- d = fireEventually()
- def check_for_furl():
- return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
- d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
- def get_furl(junk):
- kgf = os.path.join(kgsdir, 'key_generator.furl')
- self.key_generator_furl = file(kgf, 'rb').read().strip()
- d.addCallback(get_furl)
- return d
-
- def _set_up_nodes_2(self, res):
- q = self.introducer
- self.introducer_furl = q.introducer_url
- self.clients = []
- basedirs = []
- for i in range(self.numclients):
- basedir = self.getdir("client%d" % i)
- basedirs.append(basedir)
- fileutil.make_dirs(basedir)
- if i == 0:
- # client[0] runs a webserver and a helper, no key_generator
- open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
- open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
- open(os.path.join(basedir, "sizelimit"), "w").write("10GB\n")
- if i == 3:
- # client[3] runs a webserver and uses a helper, uses key_generator
- open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
- if self.key_generator_furl:
- kgf = "%s\n" % (self.key_generator_furl,)
- open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
- open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
- if self.stats_gatherer_furl:
- open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
-
- # start client[0], wait for it's tub to be ready (at which point it
- # will have registered the helper furl).
- c = self.add_service(client.Client(basedir=basedirs[0]))
- self.clients.append(c)
- d = c.when_tub_ready()
- def _ready(res):
- f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
- helper_furl = f.read()
- f.close()
- self.helper_furl = helper_furl
- f = open(os.path.join(basedirs[3],"helper.furl"), "w")
- f.write(helper_furl)
- f.close()
-
- # this starts the rest of the clients
- for i in range(1, self.numclients):
- c = self.add_service(client.Client(basedir=basedirs[i]))
- self.clients.append(c)
- log.msg("STARTING")
- return self.wait_for_connections()
- d.addCallback(_ready)
- def _connected(res):
- log.msg("CONNECTED")
- # now find out where the web port was
- l = self.clients[0].getServiceNamed("webish").listener
- port = l._port.getHost().port
- self.webish_url = "http://localhost:%d/" % port
- # and the helper-using webport
- l = self.clients[3].getServiceNamed("webish").listener
- port = l._port.getHost().port
- self.helper_webish_url = "http://localhost:%d/" % port
- d.addCallback(_connected)
- return d
-
- def _grab_stats(self, res):
- d = self.stats_gatherer.poll()
- return d
-
- def bounce_client(self, num):
- c = self.clients[num]
- d = c.disownServiceParent()
- # I think windows requires a moment to let the connection really stop
- # and the port number made available for re-use. TODO: examine the
- # behavior, see if this is really the problem, see if we can do
- # better than blindly waiting for a second.
- d.addCallback(self.stall, 1.0)
- def _stopped(res):
- new_c = client.Client(basedir=self.getdir("client%d" % num))
- self.clients[num] = new_c
- self.add_service(new_c)
- return new_c.when_tub_ready()
- d.addCallback(_stopped)
- d.addCallback(lambda res: self.wait_for_connections())
- def _maybe_get_webport(res):
- if num == 0:
- # now find out where the web port was
- l = self.clients[0].getServiceNamed("webish").listener
- port = l._port.getHost().port
- self.webish_url = "http://localhost:%d/" % port
- d.addCallback(_maybe_get_webport)
- return d
-
- def add_extra_node(self, client_num, helper_furl=None,
- add_to_sparent=False):
- # usually this node is *not* parented to our self.sparent, so we can
- # shut it down separately from the rest, to exercise the
- # connection-lost code
- basedir = self.getdir("client%d" % client_num)
- if not os.path.isdir(basedir):
- fileutil.make_dirs(basedir)
- open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
- if helper_furl:
- f = open(os.path.join(basedir, "helper.furl") ,"w")
- f.write(helper_furl+"\n")
- f.close()
-
- c = client.Client(basedir=basedir)
- self.clients.append(c)
- self.numclients += 1
- if add_to_sparent:
- c.setServiceParent(self.sparent)
- else:
- c.startService()
- d = self.wait_for_connections()
- d.addCallback(lambda res: c)
- return d
-
- def _check_connections(self):
- for c in self.clients:
- ic = c.introducer_client
- if not ic.connected_to_introducer():
- return False
- if len(ic.get_all_peerids()) != self.numclients:
- return False
- return True
-
- def wait_for_connections(self, ignored=None):
- # TODO: replace this with something that takes a list of peerids and
- # fires when they've all been heard from, instead of using a count
- # and a threshold
- return self.poll(self._check_connections, timeout=200)
+class SystemTest(SystemTestMixin, unittest.TestCase):
def test_connections(self):
self.basedir = "system/SystemTest/test_connections"