]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
check_memory: getting closer, now we have memusage numbers for uploads of 10kB and...
authorBrian Warner <warner@lothar.com>
Wed, 30 May 2007 00:39:39 +0000 (17:39 -0700)
committerBrian Warner <warner@lothar.com>
Wed, 30 May 2007 00:39:39 +0000 (17:39 -0700)
src/allmydata/client.py
src/allmydata/control.py
src/allmydata/interfaces.py
src/allmydata/test/check_memory.py
src/allmydata/test/test_introducer.py
src/allmydata/util/testutil.py

index 272a770f29bb3e8f5919f1533c8e2f9a39c750f9..f6a00861a52898b157e74900660a5df26def7929 100644 (file)
@@ -62,7 +62,7 @@ class Client(node.Node, Referenceable):
         hotline_file = os.path.join(self.basedir,
                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
         if os.path.exists(hotline_file):
-            hotline = TimerService(5.0, self._check_hotline, hotline_file)
+            hotline = TimerService(1.0, self._check_hotline, hotline_file)
             hotline.setServiceParent(self)
 
     def _check_hotline(self, hotline_file):
index d2d756daf376cf44a4ab91b6448b3dab5c339c8a..f98c5a6bafbb89ff03c56d7b89b1dd520f5c5c1e 100644 (file)
@@ -3,11 +3,20 @@ from zope.interface import implements
 from twisted.application import service
 from foolscap import Referenceable
 from allmydata.interfaces import RIControlClient
+from allmydata.util import testutil
 
 
-class ControlServer(Referenceable, service.Service):
+class ControlServer(Referenceable, service.Service, testutil.PollMixin):
     implements(RIControlClient)
 
+    def remote_wait_for_client_connections(self, num_clients):
+        def _check():
+            current_clients = list(self.parent.get_all_peerids())
+            return len(current_clients) >= num_clients
+        d = self.poll(_check, 0.5)
+        d.addCallback(lambda res: None)
+        return d
+
     def remote_upload_from_file_to_uri(self, filename):
         uploader = self.parent.getServiceNamed("uploader")
         d = uploader.upload_filename(filename)
index 7765f34cc8783148538dbd4a975ac4cbdb74871c..73b53673600803121792dc97ac7f5131d375b363 100644 (file)
@@ -617,6 +617,12 @@ class NotCapableError(Exception):
     """You have tried to write to a read-only node."""
 
 class RIControlClient(RemoteInterface):
+
+    def wait_for_client_connections(num_clients=int):
+        """Do not return until we have connections to at least NUM_CLIENTS
+        storage servers.
+        """
+
     def upload_from_file_to_uri(filename=str):
         """Upload a file to the grid. This accepts a filename (which must be
         absolute) that points to a file on the node's local disk. The node
index 549a7b9b42cfa4be7d5521550bec0f04afd90ac1..0a159f3b67f795e3fa1baf49fdac03fe3854b836 100644 (file)
@@ -6,10 +6,12 @@ from twisted.internet import defer, reactor, protocol, error
 from twisted.application import service, internet
 from allmydata import client, introducer_and_vdrive
 from allmydata.scripts import runner
-from foolscap.eventual import eventually, flushEventualQueue
+from allmydata.util import testutil
+import foolscap
+from foolscap import eventual
 from twisted.python import log
 
-class SystemFramework:
+class SystemFramework(testutil.PollMixin):
     numnodes = 5
 
     def __init__(self, basedir):
@@ -21,35 +23,59 @@ class SystemFramework:
         os.mkdir(basedir)
         self.sparent = service.MultiService()
         self.sparent.startService()
+        self.proc = None
+        self.tub = foolscap.Tub()
+        self.tub.setServiceParent(self.sparent)
 
     def run(self):
-        log.startLogging(open(os.path.join(self.basedir, "log"), "w"))
-        d = defer.Deferred()
-        eventually(d.callback, None)
-        d.addCallback(lambda res: self.start())
-        d.addErrback(log.err)
+        log.startLogging(open(os.path.join(self.basedir, "log"), "w"),
+                         setStdout=False)
+        #logfile = open(os.path.join(self.basedir, "log"), "w")
+        #flo = log.FileLogObserver(logfile)
+        #log.startLoggingWithObserver(flo.emit, setStdout=False)
+        d = eventual.fireEventually()
+        d.addCallback(lambda res: self.setUp())
+        d.addCallback(lambda res: self.do_test())
+        d.addBoth(self.tearDown)
+        def _err(err):
+            log.err(err)
+            print err
+        d.addErrback(_err)
+        d.addBoth(lambda res: reactor.stop())
         reactor.run()
 
-    def start(self):
+    def setUp(self):
         print "STARTING"
         d = self.make_introducer_and_vdrive()
         def _more(res):
             self.make_nodes()
-            self.start_client()
+            return self.start_client()
         d.addCallback(_more)
+        def _record_control_furl(control_furl):
+            self.control_furl = control_furl
+            print "OBTAINING '%s'" % (control_furl,)
+            return self.tub.getReference(self.control_furl)
+        d.addCallback(_record_control_furl)
+        def _record_control(control_rref):
+            self.control_rref = control_rref
+            return control_rref.callRemote("wait_for_client_connections",
+                                           self.numnodes+1)
+        d.addCallback(_record_control)
+        def _ready(res):
+            print "CLIENT READY"
+        d.addCallback(_ready)
         return d
 
-    def tearDown(self):
-        os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
+    def tearDown(self, passthrough):
         # the client node will shut down in a few seconds
+        #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
         log.msg("shutting down SystemTest services")
-        d = self.sparent.stopService()
-        d.addCallback(lambda res: flushEventualQueue())
-        def _done(res):
-            d1 = defer.Deferred()
-            reactor.callLater(self.DISCONNECT_DELAY, d1.callback, None)
-            return d1
-        d.addCallback(_done)
+        d = defer.succeed(None)
+        if self.proc:
+            d.addCallback(lambda res: self.kill_client())
+        d.addCallback(lambda res: self.sparent.stopService())
+        d.addCallback(lambda res: eventual.flushEventualQueue())
+        d.addCallback(lambda res: passthrough)
         return d
 
     def add_service(self, s):
@@ -95,9 +121,10 @@ this file are ignored.
         f.close()
 
     def start_client(self):
+        # this returns a Deferred that fires with the client's control.furl
         log.msg("MAKING CLIENT")
         clientdir = self.clientdir = os.path.join(self.basedir, "client")
-        config = {'basedir': clientdir, 'quiet': False}
+        config = {'basedir': clientdir, 'quiet': True}
         runner.create_client(config)
         log.msg("DONE MAKING CLIENT")
         f = open(os.path.join(clientdir, "introducer.furl"), "w")
@@ -114,16 +141,104 @@ this file are ignored.
         ts.setServiceParent(self.sparent)
 
         pp = ClientWatcher()
+        self.proc_done = pp.d = defer.Deferred()
         cmd = ["twistd", "-y", "client.tac"]
         env = os.environ.copy()
         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
         log.msg("CLIENT STARTED")
 
+        # now we wait for the client to get started. we're looking for the
+        # control.furl file to appear.
+        furl_file = os.path.join(clientdir, "control.furl")
+        def _check():
+            return os.path.exists(furl_file)
+        d = self.poll(_check, 0.1)
+        # once it exists, wait a moment before we read from it, just in case
+        # it hasn't finished writing the whole thing. Ideally control.furl
+        # would be created in some atomic fashion, or made non-readable until
+        # it's ready, but I can't think of an easy way to do that, and I
+        # think the chances that we'll observe a half-write are pretty low.
+        def _stall(res):
+            d2 = defer.Deferred()
+            reactor.callLater(0.1, d2.callback, None)
+            return d2
+        d.addCallback(_stall)
+        def _read(res):
+            f = open(furl_file, "r")
+            furl = f.read()
+            return furl.strip()
+        d.addCallback(_read)
+        return d
+
+
     def kill_client(self):
+        # returns a Deferred that fires when the process exits. This may only
+        # be called once.
         try:
             self.proc.signalProcess("KILL")
         except error.ProcessExitedAlready:
             pass
+        return self.proc_done
+
+
+    def create_data(self, name, size):
+        filename = os.path.join(self.basedir, name + ".data")
+        f = open(filename, "wb")
+        block = "a" * 8192
+        while size > 0:
+            l = min(size, 8192)
+            f.write(block[:l])
+            size -= l
+        return filename
+
+    def do_test(self):
+        print "CLIENT STARTED"
+        print "FURL", self.control_furl
+        print "RREF", self.control_rref
+        print
+        kB = 1000; MB = 1000*1000
+        files = {}
+        uris = {}
+        control = self.control_rref
+
+        def _print_usage(res=None):
+            d = control.callRemote("get_memory_usage")
+            def _print(stats):
+                print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
+                                                    stats["VmPeak"])
+            d.addCallback(_print)
+            return d
+
+        def _do_upload(res, size):
+            name = '%d' % size
+            files[name] = self.create_data(name, size)
+            d = control.callRemote("upload_from_file_to_uri", files[name])
+            def _done(uri):
+                uris[name] = uri
+                print "uploaded %s" % name
+            d.addCallback(_done)
+            return d
+
+        d = _print_usage()
+
+        for i in range(10):
+            d.addCallback(_do_upload, size=10*kB+i)
+            d.addCallback(_print_usage)
+
+        for i in range(10):
+            d.addCallback(_do_upload, size=10*MB+i)
+            d.addCallback(_print_usage)
+
+        #d.addCallback(self.stall)
+        def _done(res):
+            print "FINISHING"
+        d.addCallback(_done)
+        return d
+
+    def stall(self, res):
+        d = defer.Deferred()
+        reactor.callLater(5, d.callback, None)
+        return d
 
 
 class ClientWatcher(protocol.ProcessProtocol):
@@ -131,12 +246,11 @@ class ClientWatcher(protocol.ProcessProtocol):
         print "OUT:", data
     def errReceived(self, data):
         print "ERR:", data
+    def processEnded(self, reason):
+        self.d.callback(None)
 
 
 if __name__ == '__main__':
     sf = SystemFramework("_test_memory")
     sf.run()
 
-
-# add a config option that looks for a keepalive file, and if it disappears,
-# shut down the node.
index 012e3d31e9b99860f71c522e614db804257889df..5154f4e734d819ed6a220abc72e31b86ae932f22 100644 (file)
@@ -7,7 +7,7 @@ from foolscap import Tub, Referenceable
 from foolscap.eventual import flushEventualQueue
 from twisted.application import service
 from allmydata.introducer import IntroducerClient, Introducer
-from allmydata.util import idlib
+from allmydata.util import idlib, testutil
 
 class MyNode(Referenceable):
     pass
@@ -16,7 +16,7 @@ class LoggingMultiService(service.MultiService):
     def log(self, msg):
         pass
 
-class TestIntroducer(unittest.TestCase):
+class TestIntroducer(unittest.TestCase, testutil.PollMixin):
     def setUp(self):
         self.parent = LoggingMultiService()
         self.parent.startService()
@@ -28,22 +28,6 @@ class TestIntroducer(unittest.TestCase):
         return d
 
 
-    def poll(self, check_f, pollinterval=0.01):
-        # Return a Deferred, then call check_f periodically until it returns
-        # True, at which point the Deferred will fire.. If check_f raises an
-        # exception, the Deferred will errback.
-        d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
-        return d
-
-    def _poll(self, res, check_f, pollinterval):
-        if check_f():
-            return True
-        d = defer.Deferred()
-        d.addCallback(self._poll, check_f, pollinterval)
-        reactor.callLater(pollinterval, d.callback, None)
-        return d
-
-
     def test_create(self):
         ic = IntroducerClient(None, "introducer", "myfurl")
         def _ignore(nodeid, rref):
index 8193d2ef4225c01aee23156e939937cf98161c5d..0025b4e31629fab8a8b777bf44a2e992ab3455a9 100644 (file)
@@ -1,6 +1,6 @@
 import os, signal, time
 
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 
 class SignalMixin:
     # This class is necessary for any code which wants to use Processes
@@ -20,6 +20,24 @@ class SignalMixin:
         if self.sigchldHandler:
             signal.signal(signal.SIGCHLD, self.sigchldHandler)
 
+class PollMixin:
+
+    def poll(self, check_f, pollinterval=0.01):
+        # Return a Deferred, then call check_f periodically until it returns
+        # True, at which point the Deferred will fire.. If check_f raises an
+        # exception, the Deferred will errback.
+        d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
+        return d
+
+    def _poll(self, res, check_f, pollinterval):
+        if check_f():
+            return True
+        d = defer.Deferred()
+        d.addCallback(self._poll, check_f, pollinterval)
+        reactor.callLater(pollinterval, d.callback, None)
+        return d
+
+
 class TestMixin(SignalMixin):
     def setUp(self, repeatable=False):
         """