From ea78b4b605568479bddd91bfd85b4db96a2c759e Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 29 May 2007 17:39:39 -0700 Subject: [PATCH] check_memory: getting closer, now we have memusage numbers for uploads of 10kB and 10MB files --- src/allmydata/client.py | 2 +- src/allmydata/control.py | 11 +- src/allmydata/interfaces.py | 6 + src/allmydata/test/check_memory.py | 158 ++++++++++++++++++++++---- src/allmydata/test/test_introducer.py | 20 +--- src/allmydata/util/testutil.py | 20 +++- 6 files changed, 174 insertions(+), 43 deletions(-) diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 272a770f..f6a00861 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -62,7 +62,7 @@ class Client(node.Node, Referenceable): hotline_file = os.path.join(self.basedir, self.SUICIDE_PREVENTION_HOTLINE_FILE) if os.path.exists(hotline_file): - hotline = TimerService(5.0, self._check_hotline, hotline_file) + hotline = TimerService(1.0, self._check_hotline, hotline_file) hotline.setServiceParent(self) def _check_hotline(self, hotline_file): diff --git a/src/allmydata/control.py b/src/allmydata/control.py index d2d756da..f98c5a6b 100644 --- a/src/allmydata/control.py +++ b/src/allmydata/control.py @@ -3,11 +3,20 @@ from zope.interface import implements from twisted.application import service from foolscap import Referenceable from allmydata.interfaces import RIControlClient +from allmydata.util import testutil -class ControlServer(Referenceable, service.Service): +class ControlServer(Referenceable, service.Service, testutil.PollMixin): implements(RIControlClient) + def remote_wait_for_client_connections(self, num_clients): + def _check(): + current_clients = list(self.parent.get_all_peerids()) + return len(current_clients) >= num_clients + d = self.poll(_check, 0.5) + d.addCallback(lambda res: None) + return d + def remote_upload_from_file_to_uri(self, filename): uploader = self.parent.getServiceNamed("uploader") d = uploader.upload_filename(filename) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 7765f34c..73b53673 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -617,6 +617,12 @@ class NotCapableError(Exception): """You have tried to write to a read-only node.""" class RIControlClient(RemoteInterface): + + def wait_for_client_connections(num_clients=int): + """Do not return until we have connections to at least NUM_CLIENTS + storage servers. + """ + def upload_from_file_to_uri(filename=str): """Upload a file to the grid. This accepts a filename (which must be absolute) that points to a file on the node's local disk. The node diff --git a/src/allmydata/test/check_memory.py b/src/allmydata/test/check_memory.py index 549a7b9b..0a159f3b 100644 --- a/src/allmydata/test/check_memory.py +++ b/src/allmydata/test/check_memory.py @@ -6,10 +6,12 @@ from twisted.internet import defer, reactor, protocol, error from twisted.application import service, internet from allmydata import client, introducer_and_vdrive from allmydata.scripts import runner -from foolscap.eventual import eventually, flushEventualQueue +from allmydata.util import testutil +import foolscap +from foolscap import eventual from twisted.python import log -class SystemFramework: +class SystemFramework(testutil.PollMixin): numnodes = 5 def __init__(self, basedir): @@ -21,35 +23,59 @@ class SystemFramework: os.mkdir(basedir) self.sparent = service.MultiService() self.sparent.startService() + self.proc = None + self.tub = foolscap.Tub() + self.tub.setServiceParent(self.sparent) def run(self): - log.startLogging(open(os.path.join(self.basedir, "log"), "w")) - d = defer.Deferred() - eventually(d.callback, None) - d.addCallback(lambda res: self.start()) - d.addErrback(log.err) + log.startLogging(open(os.path.join(self.basedir, "log"), "w"), + setStdout=False) + #logfile = open(os.path.join(self.basedir, "log"), "w") + #flo = log.FileLogObserver(logfile) + #log.startLoggingWithObserver(flo.emit, setStdout=False) + d = eventual.fireEventually() + d.addCallback(lambda res: self.setUp()) + d.addCallback(lambda res: self.do_test()) + d.addBoth(self.tearDown) + def _err(err): + log.err(err) + print err + d.addErrback(_err) + d.addBoth(lambda res: reactor.stop()) reactor.run() - def start(self): + def setUp(self): print "STARTING" d = self.make_introducer_and_vdrive() def _more(res): self.make_nodes() - self.start_client() + return self.start_client() d.addCallback(_more) + def _record_control_furl(control_furl): + self.control_furl = control_furl + print "OBTAINING '%s'" % (control_furl,) + return self.tub.getReference(self.control_furl) + d.addCallback(_record_control_furl) + def _record_control(control_rref): + self.control_rref = control_rref + return control_rref.callRemote("wait_for_client_connections", + self.numnodes+1) + d.addCallback(_record_control) + def _ready(res): + print "CLIENT READY" + d.addCallback(_ready) return d - def tearDown(self): - os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline")) + def tearDown(self, passthrough): # the client node will shut down in a few seconds + #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline")) log.msg("shutting down SystemTest services") - d = self.sparent.stopService() - d.addCallback(lambda res: flushEventualQueue()) - def _done(res): - d1 = defer.Deferred() - reactor.callLater(self.DISCONNECT_DELAY, d1.callback, None) - return d1 - d.addCallback(_done) + d = defer.succeed(None) + if self.proc: + d.addCallback(lambda res: self.kill_client()) + d.addCallback(lambda res: self.sparent.stopService()) + d.addCallback(lambda res: eventual.flushEventualQueue()) + d.addCallback(lambda res: passthrough) return d def add_service(self, s): @@ -95,9 +121,10 @@ this file are ignored. f.close() def start_client(self): + # this returns a Deferred that fires with the client's control.furl log.msg("MAKING CLIENT") clientdir = self.clientdir = os.path.join(self.basedir, "client") - config = {'basedir': clientdir, 'quiet': False} + config = {'basedir': clientdir, 'quiet': True} runner.create_client(config) log.msg("DONE MAKING CLIENT") f = open(os.path.join(clientdir, "introducer.furl"), "w") @@ -114,16 +141,104 @@ this file are ignored. ts.setServiceParent(self.sparent) pp = ClientWatcher() + self.proc_done = pp.d = defer.Deferred() cmd = ["twistd", "-y", "client.tac"] env = os.environ.copy() self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir) log.msg("CLIENT STARTED") + # now we wait for the client to get started. we're looking for the + # control.furl file to appear. + furl_file = os.path.join(clientdir, "control.furl") + def _check(): + return os.path.exists(furl_file) + d = self.poll(_check, 0.1) + # once it exists, wait a moment before we read from it, just in case + # it hasn't finished writing the whole thing. Ideally control.furl + # would be created in some atomic fashion, or made non-readable until + # it's ready, but I can't think of an easy way to do that, and I + # think the chances that we'll observe a half-write are pretty low. + def _stall(res): + d2 = defer.Deferred() + reactor.callLater(0.1, d2.callback, None) + return d2 + d.addCallback(_stall) + def _read(res): + f = open(furl_file, "r") + furl = f.read() + return furl.strip() + d.addCallback(_read) + return d + + def kill_client(self): + # returns a Deferred that fires when the process exits. This may only + # be called once. try: self.proc.signalProcess("KILL") except error.ProcessExitedAlready: pass + return self.proc_done + + + def create_data(self, name, size): + filename = os.path.join(self.basedir, name + ".data") + f = open(filename, "wb") + block = "a" * 8192 + while size > 0: + l = min(size, 8192) + f.write(block[:l]) + size -= l + return filename + + def do_test(self): + print "CLIENT STARTED" + print "FURL", self.control_furl + print "RREF", self.control_rref + print + kB = 1000; MB = 1000*1000 + files = {} + uris = {} + control = self.control_rref + + def _print_usage(res=None): + d = control.callRemote("get_memory_usage") + def _print(stats): + print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"], + stats["VmPeak"]) + d.addCallback(_print) + return d + + def _do_upload(res, size): + name = '%d' % size + files[name] = self.create_data(name, size) + d = control.callRemote("upload_from_file_to_uri", files[name]) + def _done(uri): + uris[name] = uri + print "uploaded %s" % name + d.addCallback(_done) + return d + + d = _print_usage() + + for i in range(10): + d.addCallback(_do_upload, size=10*kB+i) + d.addCallback(_print_usage) + + for i in range(10): + d.addCallback(_do_upload, size=10*MB+i) + d.addCallback(_print_usage) + + #d.addCallback(self.stall) + def _done(res): + print "FINISHING" + d.addCallback(_done) + return d + + def stall(self, res): + d = defer.Deferred() + reactor.callLater(5, d.callback, None) + return d class ClientWatcher(protocol.ProcessProtocol): @@ -131,12 +246,11 @@ class ClientWatcher(protocol.ProcessProtocol): print "OUT:", data def errReceived(self, data): print "ERR:", data + def processEnded(self, reason): + self.d.callback(None) if __name__ == '__main__': sf = SystemFramework("_test_memory") sf.run() - -# add a config option that looks for a keepalive file, and if it disappears, -# shut down the node. diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 012e3d31..5154f4e7 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -7,7 +7,7 @@ from foolscap import Tub, Referenceable from foolscap.eventual import flushEventualQueue from twisted.application import service from allmydata.introducer import IntroducerClient, Introducer -from allmydata.util import idlib +from allmydata.util import idlib, testutil class MyNode(Referenceable): pass @@ -16,7 +16,7 @@ class LoggingMultiService(service.MultiService): def log(self, msg): pass -class TestIntroducer(unittest.TestCase): +class TestIntroducer(unittest.TestCase, testutil.PollMixin): def setUp(self): self.parent = LoggingMultiService() self.parent.startService() @@ -28,22 +28,6 @@ class TestIntroducer(unittest.TestCase): return d - def poll(self, check_f, pollinterval=0.01): - # Return a Deferred, then call check_f periodically until it returns - # True, at which point the Deferred will fire.. If check_f raises an - # exception, the Deferred will errback. - d = defer.maybeDeferred(self._poll, None, check_f, pollinterval) - return d - - def _poll(self, res, check_f, pollinterval): - if check_f(): - return True - d = defer.Deferred() - d.addCallback(self._poll, check_f, pollinterval) - reactor.callLater(pollinterval, d.callback, None) - return d - - def test_create(self): ic = IntroducerClient(None, "introducer", "myfurl") def _ignore(nodeid, rref): diff --git a/src/allmydata/util/testutil.py b/src/allmydata/util/testutil.py index 8193d2ef..0025b4e3 100644 --- a/src/allmydata/util/testutil.py +++ b/src/allmydata/util/testutil.py @@ -1,6 +1,6 @@ import os, signal, time -from twisted.internet import reactor +from twisted.internet import reactor, defer class SignalMixin: # This class is necessary for any code which wants to use Processes @@ -20,6 +20,24 @@ class SignalMixin: if self.sigchldHandler: signal.signal(signal.SIGCHLD, self.sigchldHandler) +class PollMixin: + + def poll(self, check_f, pollinterval=0.01): + # Return a Deferred, then call check_f periodically until it returns + # True, at which point the Deferred will fire.. If check_f raises an + # exception, the Deferred will errback. + d = defer.maybeDeferred(self._poll, None, check_f, pollinterval) + return d + + def _poll(self, res, check_f, pollinterval): + if check_f(): + return True + d = defer.Deferred() + d.addCallback(self._poll, check_f, pollinterval) + reactor.callLater(pollinterval, d.callback, None) + return d + + class TestMixin(SignalMixin): def setUp(self, repeatable=False): """ -- 2.45.2