]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/check_memory.py
tests: fix check_memory test
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
index a9f0de16a3ff2051ada13cce32cb9246f857a23c..fdce601e01bd0491a60eba1100fca2749d15b02e 100644 (file)
 #! /usr/bin/env python
 
-import os, shutil
-
+import os, shutil, sys, urllib, time, stat
+from cStringIO import StringIO
 from twisted.internet import defer, reactor, protocol, error
-from twisted.application import service
-from allmydata import client, introducer_and_vdrive
-from allmydata.scripts import runner
-from foolscap.eventual import eventually, flushEventualQueue
+from twisted.application import service, internet
+from twisted.web import client as tw_client
+from allmydata import client, introducer
+from allmydata.immutable import upload
+from allmydata.scripts import create_node
+from allmydata.util import fileutil, pollmixin
+from allmydata.util.fileutil import abspath_expanduser_unicode
+from allmydata.util.encodingutil import get_filesystem_encoding
+from foolscap.api import Tub, fireEventually, flushEventualQueue
 from twisted.python import log
 
-class SystemFramework:
-    numnodes = 5
+class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
+    full_speed_ahead = False
+    _bytes_so_far = 0
+    stalled = None
+    def handleResponsePart(self, data):
+        self._bytes_so_far += len(data)
+        if not self.factory.do_stall:
+            return
+        if self.full_speed_ahead:
+            return
+        if self._bytes_so_far > 1e6+100:
+            if not self.stalled:
+                print "STALLING"
+                self.transport.pauseProducing()
+                self.stalled = reactor.callLater(10.0, self._resume_speed)
+    def _resume_speed(self):
+        print "RESUME SPEED"
+        self.stalled = None
+        self.full_speed_ahead = True
+        self.transport.resumeProducing()
+    def handleResponseEnd(self):
+        if self.stalled:
+            print "CANCEL"
+            self.stalled.cancel()
+            self.stalled = None
+        return tw_client.HTTPPageGetter.handleResponseEnd(self)
+
+class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
+    protocol = StallableHTTPGetterDiscarder
+
+def discardPage(url, stall=False, *args, **kwargs):
+    """Start fetching the URL, but stall our pipe after the first 1MB.
+    Wait 10 seconds, then resume downloading (and discarding) everything.
+    """
+    # adapted from twisted.web.client.getPage . We can't just wrap or
+    # subclass because it provides no way to override the HTTPClientFactory
+    # that it creates.
+    scheme, host, port, path = tw_client._parse(url)
+    factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
+    factory.do_stall = stall
+    assert scheme == 'http'
+    reactor.connectTCP(host, port, factory)
+    return factory.deferred
+
+class ChildDidNotStartError(Exception):
+    pass
 
-    def __init__(self, basedir):
-        self.basedir = basedir = os.path.abspath(basedir)
-        if not basedir.startswith(os.path.abspath(".")):
+class SystemFramework(pollmixin.PollMixin):
+    numnodes = 7
+
+    def __init__(self, basedir, mode):
+        self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
+        if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
             raise AssertionError("safety issue: basedir must be a subdir")
-        if os.path.exists(basedir):
-            shutil.rmtree(basedir)
-        os.mkdir(basedir)
+        self.testdir = testdir = os.path.join(basedir, "test")
+        if os.path.exists(testdir):
+            shutil.rmtree(testdir)
+        fileutil.make_dirs(testdir)
         self.sparent = service.MultiService()
         self.sparent.startService()
+        self.proc = None
+        self.tub = Tub()
+        self.tub.setOption("expose-remote-exception-types", False)
+        self.tub.setServiceParent(self.sparent)
+        self.mode = mode
+        self.failed = False
+        self.keepalive_file = None
 
     def run(self):
-        log.startLogging(open(os.path.join(self.basedir, "log"), "w"))
-        d = defer.Deferred()
-        eventually(d.callback, None)
-        d.addCallback(lambda res: self.start())
-        d.addErrback(log.err)
+        framelog = os.path.join(self.basedir, "driver.log")
+        log.startLogging(open(framelog, "a"), setStdout=False)
+        log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
+        #logfile = open(os.path.join(self.testdir, "log"), "w")
+        #flo = log.FileLogObserver(logfile)
+        #log.startLoggingWithObserver(flo.emit, setStdout=False)
+        d = fireEventually()
+        d.addCallback(lambda res: self.setUp())
+        d.addCallback(lambda res: self.record_initial_memusage())
+        d.addCallback(lambda res: self.make_nodes())
+        d.addCallback(lambda res: self.wait_for_client_connected())
+        d.addCallback(lambda res: self.do_test())
+        d.addBoth(self.tearDown)
+        def _err(err):
+            self.failed = err
+            log.err(err)
+            print err
+        d.addErrback(_err)
+        def _done(res):
+            reactor.stop()
+            return res
+        d.addBoth(_done)
         reactor.run()
+        if self.failed:
+            # raiseException doesn't work for CopiedFailures
+            self.failed.raiseException()
 
-    def start(self):
-        print "STARTING"
-        d = self.make_introducer_and_vdrive()
+    def setUp(self):
+        #print "STARTING"
+        self.stats = {}
+        self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
+        d = self.make_introducer()
         def _more(res):
-            self.make_nodes()
-            self.start_client()
+            return self.start_client()
         d.addCallback(_more)
+        def _record_control_furl(control_furl):
+            self.control_furl = control_furl
+            #print "OBTAINING '%s'" % (control_furl,)
+            return self.tub.getReference(self.control_furl)
+        d.addCallback(_record_control_furl)
+        def _record_control(control_rref):
+            self.control_rref = control_rref
+        d.addCallback(_record_control)
+        def _ready(res):
+            #print "CLIENT READY"
+            pass
+        d.addCallback(_ready)
+        return d
+
+    def record_initial_memusage(self):
+        print
+        print "Client started (no connections yet)"
+        d = self._print_usage()
+        d.addCallback(self.stash_stats, "init")
         return d
 
-    def tearDown(self):
-        os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
+    def wait_for_client_connected(self):
+        print
+        print "Client connecting to other nodes.."
+        return self.control_rref.callRemote("wait_for_client_connections",
+                                            self.numnodes+1)
+
+    def tearDown(self, passthrough):
         # the client node will shut down in a few seconds
+        #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
         log.msg("shutting down SystemTest services")
-        d = self.sparent.stopService()
+        if self.keepalive_file and os.path.exists(self.keepalive_file):
+            age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
+            log.msg("keepalive file at shutdown was %ds old" % age)
+        d = defer.succeed(None)
+        if self.proc:
+            d.addCallback(lambda res: self.kill_client())
+        d.addCallback(lambda res: self.sparent.stopService())
         d.addCallback(lambda res: flushEventualQueue())
-        def _done(res):
-            d1 = defer.Deferred()
-            reactor.callLater(self.DISCONNECT_DELAY, d1.callback, None)
-            return d1
-        d.addCallback(_done)
+        def _close_statsfile(res):
+            self.statsfile.close()
+        d.addCallback(_close_statsfile)
+        d.addCallback(lambda res: passthrough)
         return d
 
     def add_service(self, s):
         s.setServiceParent(self.sparent)
         return s
 
-    def make_introducer_and_vdrive(self):
-        introducer_and_vdrive_dir = os.path.join(self.basedir, "introducer_and_vdrive")
-        os.mkdir(introducer_and_vdrive_dir)
-        self.introducer_and_vdrive = self.add_service(introducer_and_vdrive.IntroducerAndVdrive(basedir=introducer_and_vdrive_dir))
-        d = self.introducer_and_vdrive.when_tub_ready()
+    def make_introducer(self):
+        iv_basedir = os.path.join(self.testdir, "introducer")
+        os.mkdir(iv_basedir)
+        iv = introducer.IntroducerNode(basedir=iv_basedir)
+        self.introducer = self.add_service(iv)
+        d = self.introducer.when_tub_ready()
+        def _introducer_ready(res):
+            q = self.introducer
+            self.introducer_furl = q.introducer_url
+        d.addCallback(_introducer_ready)
         return d
 
     def make_nodes(self):
-        q = self.introducer_and_vdrive
-        self.introducer_furl = q.urls["introducer"]
-        vdrive_furl = q.urls["vdrive"]
         self.nodes = []
         for i in range(self.numnodes):
-            nodedir = os.path.join(self.basedir, "node%d" % i)
+            nodedir = os.path.join(self.testdir, "node%d" % i)
             os.mkdir(nodedir)
-            f = open(os.path.join(nodedir, "introducer.furl"), "w")
-            f.write(self.introducer_furl)
-            f.close()
-            f = open(os.path.join(nodedir, "vdrive.furl"), "w")
-            f.write(vdrive_furl)
+            f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
+            f.write("[client]\n"
+                    "introducer.furl = %s\n"
+                    "shares.happy = 1\n"
+                    "[storage]\n"
+                    % (self.introducer_furl,))
+            # the only tests for which we want the internal nodes to actually
+            # retain shares are the ones where somebody's going to download
+            # them.
+            if self.mode in ("download", "download-GET", "download-GET-slow"):
+                # retain shares
+                pass
+            else:
+                # for these tests, we tell the storage servers to pretend to
+                # accept shares, but really just throw them out, since we're
+                # only testing upload and not download.
+                f.write("debug_discard = true\n")
+            if self.mode in ("receive",):
+                # for this mode, the client-under-test gets all the shares,
+                # so our internal nodes can refuse requests
+                f.write("readonly = true\n")
             f.close()
             c = self.add_service(client.Client(basedir=nodedir))
             self.nodes.append(c)
         # the peers will start running, eventually they will connect to each
-        # other and the introducer_and_vdrive
+        # other and the introducer
 
     def touch_keepalive(self):
+        if os.path.exists(self.keepalive_file):
+            age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
+            log.msg("touching keepalive file, was %ds old" % age)
         f = open(self.keepalive_file, "w")
-        f.write("If the node notices this file at startup, it will poll and\n")
-        f.write("terminate as soon as the file goes away. This prevents\n")
-        f.write("leaving processes around if the test harness has an\n")
-        f.write("internal failure and neglects to kill off the node\n")
-        f.write("itself. The contents of this file are ignored.\n")
+        f.write("""\
+If the node notices this file at startup, it will poll every 5 seconds and
+terminate if the file is more than 10 seconds old, or if it has been deleted.
+If the test harness has an internal failure and neglects to kill off the node
+itself, this helps to avoid leaving processes lying around. The contents of
+this file are ignored.
+        """)
         f.close()
 
     def start_client(self):
+        # this returns a Deferred that fires with the client's control.furl
         log.msg("MAKING CLIENT")
-        clientdir = self.clientdir = os.path.join(self.basedir, "client")
-        config = {'basedir': clientdir}
-        runner.create_client(config)
+        # self.testdir is an absolute Unicode path
+        clientdir = self.clientdir = os.path.join(self.testdir, u"client")
+        clientdir_str = clientdir.encode(get_filesystem_encoding())
+        quiet = StringIO()
+        create_node.create_node({'basedir': clientdir}, out=quiet)
         log.msg("DONE MAKING CLIENT")
-        f = open(os.path.join(clientdir, "introducer.furl"), "w")
-        f.write(self.introducer_furl + "\n")
+        # now replace tahoe.cfg
+        # set webport=0 and then ask the node what port it picked.
+        f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
+        f.write("[node]\n"
+                "web.port = tcp:0:interface=127.0.0.1\n"
+                "[client]\n"
+                "introducer.furl = %s\n"
+                "shares.happy = 1\n"
+                "[storage]\n"
+                % (self.introducer_furl,))
+
+        if self.mode in ("upload-self", "receive"):
+            # accept and store shares, to trigger the memory consumption bugs
+            pass
+        else:
+            # don't accept any shares
+            f.write("readonly = true\n")
+            ## also, if we do receive any shares, throw them away
+            #f.write("debug_discard = true")
+        if self.mode == "upload-self":
+            pass
         f.close()
-        self.keepalive_file = os.path.join(clientdir, "suicide_prevention_hotline")
-        self.touch_keepalive()
+        self.keepalive_file = os.path.join(clientdir,
+                                           "suicide_prevention_hotline")
         # now start updating the mtime.
+        self.touch_keepalive()
+        ts = internet.TimerService(1.0, self.touch_keepalive)
+        ts.setServiceParent(self.sparent)
 
         pp = ClientWatcher()
-        cmd = ["twistd", "-y", "client.tac"]
+        self.proc_done = pp.d = defer.Deferred()
+        logfile = os.path.join(self.basedir, "client.log")
+        cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
         env = os.environ.copy()
-        self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
+        self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
         log.msg("CLIENT STARTED")
 
+        # now we wait for the client to get started. we're looking for the
+        # control.furl file to appear.
+        furl_file = os.path.join(clientdir, "private", "control.furl")
+        url_file = os.path.join(clientdir, "node.url")
+        def _check():
+            if pp.ended and pp.ended.value.status != 0:
+                # the twistd process ends normally (with rc=0) if the child
+                # is successfully launched. It ends abnormally (with rc!=0)
+                # if the child cannot be launched.
+                raise ChildDidNotStartError("process ended while waiting for startup")
+            return os.path.exists(furl_file)
+        d = self.poll(_check, 0.1)
+        # once it exists, wait a moment before we read from it, just in case
+        # it hasn't finished writing the whole thing. Ideally control.furl
+        # would be created in some atomic fashion, or made non-readable until
+        # it's ready, but I can't think of an easy way to do that, and I
+        # think the chances that we'll observe a half-write are pretty low.
+        def _stall(res):
+            d2 = defer.Deferred()
+            reactor.callLater(0.1, d2.callback, None)
+            return d2
+        d.addCallback(_stall)
+        def _read(res):
+            # read the node's URL
+            self.webish_url = open(url_file, "r").read().strip()
+            if self.webish_url[-1] == "/":
+                # trim trailing slash, since the rest of the code wants it gone
+                self.webish_url = self.webish_url[:-1]
+            f = open(furl_file, "r")
+            furl = f.read()
+            return furl.strip()
+        d.addCallback(_read)
+        return d
+
+
     def kill_client(self):
+        # returns a Deferred that fires when the process exits. This may only
+        # be called once.
         try:
-            self.proc.signalProcess("KILL")
+            self.proc.signalProcess("INT")
         except error.ProcessExitedAlready:
             pass
+        return self.proc_done
+
+
+    def create_data(self, name, size):
+        filename = os.path.join(self.testdir, name + ".data")
+        f = open(filename, "wb")
+        block = "a" * 8192
+        while size > 0:
+            l = min(size, 8192)
+            f.write(block[:l])
+            size -= l
+        return filename
+
+    def stash_stats(self, stats, name):
+        self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
+        self.statsfile.flush()
+        self.stats[name] = stats['VmPeak']
+
+    def POST(self, urlpath, **fields):
+        url = self.webish_url + urlpath
+        sepbase = "boogabooga"
+        sep = "--" + sepbase
+        form = []
+        form.append(sep)
+        form.append('Content-Disposition: form-data; name="_charset"')
+        form.append('')
+        form.append('UTF-8')
+        form.append(sep)
+        for name, value in fields.iteritems():
+            if isinstance(value, tuple):
+                filename, value = value
+                form.append('Content-Disposition: form-data; name="%s"; '
+                            'filename="%s"' % (name, filename))
+            else:
+                form.append('Content-Disposition: form-data; name="%s"' % name)
+            form.append('')
+            form.append(value)
+            form.append(sep)
+        form[-1] += "--"
+        body = "\r\n".join(form) + "\r\n"
+        headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
+                   }
+        return tw_client.getPage(url, method="POST", postdata=body,
+                                 headers=headers, followRedirect=False)
+
+    def GET_discard(self, urlpath, stall):
+        url = self.webish_url + urlpath + "?filename=dummy-get.out"
+        return discardPage(url, stall)
+
+    def _print_usage(self, res=None):
+        d = self.control_rref.callRemote("get_memory_usage")
+        def _print(stats):
+            print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
+                                                stats["VmPeak"])
+            return stats
+        d.addCallback(_print)
+        return d
+
+    def _do_upload(self, res, size, files, uris):
+        name = '%d' % size
+        print
+        print "uploading %s" % name
+        if self.mode in ("upload", "upload-self"):
+            files[name] = self.create_data(name, size)
+            d = self.control_rref.callRemote("upload_from_file_to_uri",
+                                             files[name].encode("utf-8"),
+                                             convergence="check-memory")
+            def _done(uri):
+                os.remove(files[name])
+                del files[name]
+                return uri
+            d.addCallback(_done)
+        elif self.mode == "upload-POST":
+            data = "a" * size
+            url = "/uri"
+            d = self.POST(url, t="upload", file=("%d.data" % size, data))
+        elif self.mode in ("receive",
+                           "download", "download-GET", "download-GET-slow"):
+            # mode=receive: upload the data from a local peer, so that the
+            # client-under-test receives and stores the shares
+            #
+            # mode=download*: upload the data from a local peer, then have
+            # the client-under-test download it.
+            #
+            # we need to wait until the uploading node has connected to all
+            # peers, since the wait_for_client_connections() above doesn't
+            # pay attention to our self.nodes[] and their connections.
+            files[name] = self.create_data(name, size)
+            u = self.nodes[0].getServiceNamed("uploader")
+            d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
+            d.addCallback(lambda res:
+                          u.upload(upload.FileName(files[name],
+                                                   convergence="check-memory")))
+            d.addCallback(lambda results: results.uri)
+        else:
+            raise ValueError("unknown mode=%s" % self.mode)
+        def _complete(uri):
+            uris[name] = uri
+            print "uploaded %s" % name
+        d.addCallback(_complete)
+        return d
+
+    def _do_download(self, res, size, uris):
+        if self.mode not in ("download", "download-GET", "download-GET-slow"):
+            return
+        name = '%d' % size
+        print "downloading %s" % name
+        uri = uris[name]
+
+        if self.mode == "download":
+            d = self.control_rref.callRemote("download_from_uri_to_file",
+                                             uri, "dummy.out")
+        elif self.mode == "download-GET":
+            url = "/uri/%s" % uri
+            d = self.GET_discard(urllib.quote(url), stall=False)
+        elif self.mode == "download-GET-slow":
+            url = "/uri/%s" % uri
+            d = self.GET_discard(urllib.quote(url), stall=True)
+
+        def _complete(res):
+            print "downloaded %s" % name
+            return res
+        d.addCallback(_complete)
+        return d
+
+    def do_test(self):
+        #print "CLIENT STARTED"
+        #print "FURL", self.control_furl
+        #print "RREF", self.control_rref
+        #print
+        kB = 1000; MB = 1000*1000
+        files = {}
+        uris = {}
+
+        d = self._print_usage()
+        d.addCallback(self.stash_stats, "0B")
+
+        for i in range(10):
+            d.addCallback(self._do_upload, 10*kB+i, files, uris)
+            d.addCallback(self._do_download, 10*kB+i, uris)
+            d.addCallback(self._print_usage)
+        d.addCallback(self.stash_stats, "10kB")
+
+        for i in range(3):
+            d.addCallback(self._do_upload, 10*MB+i, files, uris)
+            d.addCallback(self._do_download, 10*MB+i, uris)
+            d.addCallback(self._print_usage)
+        d.addCallback(self.stash_stats, "10MB")
+
+        for i in range(1):
+            d.addCallback(self._do_upload, 50*MB+i, files, uris)
+            d.addCallback(self._do_download, 50*MB+i, uris)
+            d.addCallback(self._print_usage)
+        d.addCallback(self.stash_stats, "50MB")
+
+        #for i in range(1):
+        #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
+        #    d.addCallback(self._do_download, 100*MB+i, uris)
+        #    d.addCallback(self._print_usage)
+        #d.addCallback(self.stash_stats, "100MB")
+
+        #d.addCallback(self.stall)
+        def _done(res):
+            print "FINISHING"
+        d.addCallback(_done)
+        return d
+
+    def stall(self, res):
+        d = defer.Deferred()
+        reactor.callLater(5, d.callback, None)
+        return d
 
 
 class ClientWatcher(protocol.ProcessProtocol):
+    ended = False
     def outReceived(self, data):
         print "OUT:", data
     def errReceived(self, data):
         print "ERR:", data
+    def processEnded(self, reason):
+        self.ended = reason
+        self.d.callback(None)
 
 
 if __name__ == '__main__':
-    sf = SystemFramework("_test_memory")
+    mode = "upload"
+    if len(sys.argv) > 1:
+        mode = sys.argv[1]
+    # put the logfile and stats.out in _test_memory/ . These stick around.
+    # put the nodes and other files in _test_memory/test/ . These are
+    # removed each time we run.
+    sf = SystemFramework("_test_memory", mode)
     sf.run()
 
-
-# add a config option that looks for a keepalive file, and if it disappears,
-# shut down the node.