]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/check_memory.py
check_memory.py: announce 32-bit vs 64-bit
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
index 71e0246a5eb155325ba9590bfc06d972064e9411..8ee18c73f68f1013376a45f93413921baf22900e 100644 (file)
-#! /usr/bin/env python
-
-import os, shutil, sys
+import os, shutil, sys, urllib, time, stat, urlparse
 from cStringIO import StringIO
 from twisted.internet import defer, reactor, protocol, error
 from twisted.application import service, internet
-from twisted.web.client import getPage
-from allmydata import client, introducer_and_vdrive
+from twisted.web import client as tw_client
+from allmydata import client, introducer
+from allmydata.immutable import upload
 from allmydata.scripts import create_node
-from allmydata.util import testutil
-import foolscap
-from foolscap import eventual
+from allmydata.util import fileutil, pollmixin
+from allmydata.util.fileutil import abspath_expanduser_unicode
+from allmydata.util.encodingutil import get_filesystem_encoding
+from foolscap.api import Tub, fireEventually, flushEventualQueue
 from twisted.python import log
 
-class SystemFramework(testutil.PollMixin):
-    numnodes = 5
+class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
+    full_speed_ahead = False
+    _bytes_so_far = 0
+    stalled = None
+    def handleResponsePart(self, data):
+        self._bytes_so_far += len(data)
+        if not self.factory.do_stall:
+            return
+        if self.full_speed_ahead:
+            return
+        if self._bytes_so_far > 1e6+100:
+            if not self.stalled:
+                print "STALLING"
+                self.transport.pauseProducing()
+                self.stalled = reactor.callLater(10.0, self._resume_speed)
+    def _resume_speed(self):
+        print "RESUME SPEED"
+        self.stalled = None
+        self.full_speed_ahead = True
+        self.transport.resumeProducing()
+    def handleResponseEnd(self):
+        if self.stalled:
+            print "CANCEL"
+            self.stalled.cancel()
+            self.stalled = None
+        return tw_client.HTTPPageGetter.handleResponseEnd(self)
+
+class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
+    protocol = StallableHTTPGetterDiscarder
+
+def discardPage(url, stall=False, *args, **kwargs):
+    """Start fetching the URL, but stall our pipe after the first 1MB.
+    Wait 10 seconds, then resume downloading (and discarding) everything.
+    """
+    # adapted from twisted.web.client.getPage . We can't just wrap or
+    # subclass because it provides no way to override the HTTPClientFactory
+    # that it creates.
+    scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
+    assert scheme == 'http'
+    host, port = netloc, 80
+    if ":" in host:
+        host, port = host.split(":")
+        port = int(port)
+    factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
+    factory.do_stall = stall
+    reactor.connectTCP(host, port, factory)
+    return factory.deferred
+
+class ChildDidNotStartError(Exception):
+    pass
+
+class SystemFramework(pollmixin.PollMixin):
+    numnodes = 7
 
     def __init__(self, basedir, mode):
-        self.basedir = basedir = os.path.abspath(basedir)
-        if not basedir.startswith(os.path.abspath(".")):
+        self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
+        if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
             raise AssertionError("safety issue: basedir must be a subdir")
-        if os.path.exists(basedir):
-            shutil.rmtree(basedir)
-        os.mkdir(basedir)
+        self.testdir = testdir = os.path.join(basedir, "test")
+        if os.path.exists(testdir):
+            shutil.rmtree(testdir)
+        fileutil.make_dirs(testdir)
         self.sparent = service.MultiService()
         self.sparent.startService()
         self.proc = None
-        self.tub = foolscap.Tub()
+        self.tub = Tub()
+        self.tub.setOption("expose-remote-exception-types", False)
         self.tub.setServiceParent(self.sparent)
-        self.discard_shares = True
         self.mode = mode
+        self.failed = False
+        self.keepalive_file = None
 
     def run(self):
-        log.startLogging(open(os.path.join(self.basedir, "log"), "w"),
-                         setStdout=False)
-        #logfile = open(os.path.join(self.basedir, "log"), "w")
+        framelog = os.path.join(self.basedir, "driver.log")
+        log.startLogging(open(framelog, "a"), setStdout=False)
+        log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
+        #logfile = open(os.path.join(self.testdir, "log"), "w")
         #flo = log.FileLogObserver(logfile)
         #log.startLoggingWithObserver(flo.emit, setStdout=False)
-        d = eventual.fireEventually()
+        d = fireEventually()
         d.addCallback(lambda res: self.setUp())
+        d.addCallback(lambda res: self.record_initial_memusage())
+        d.addCallback(lambda res: self.make_nodes())
+        d.addCallback(lambda res: self.wait_for_client_connected())
         d.addCallback(lambda res: self.do_test())
         d.addBoth(self.tearDown)
         def _err(err):
+            self.failed = err
             log.err(err)
             print err
         d.addErrback(_err)
-        d.addBoth(lambda res: reactor.stop())
+        def _done(res):
+            reactor.stop()
+            return res
+        d.addBoth(_done)
         reactor.run()
+        if self.failed:
+            # raiseException doesn't work for CopiedFailures
+            self.failed.raiseException()
 
     def setUp(self):
         #print "STARTING"
         self.stats = {}
-        self.statsfile = open(os.path.join(self.basedir, "stats.out"), "w")
-        d = self.make_introducer_and_vdrive()
+        self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
+        d = self.make_introducer()
         def _more(res):
-            self.make_nodes()
             return self.start_client()
         d.addCallback(_more)
         def _record_control_furl(control_furl):
@@ -63,8 +127,6 @@ class SystemFramework(testutil.PollMixin):
         d.addCallback(_record_control_furl)
         def _record_control(control_rref):
             self.control_rref = control_rref
-            return control_rref.callRemote("wait_for_client_connections",
-                                           self.numnodes+1)
         d.addCallback(_record_control)
         def _ready(res):
             #print "CLIENT READY"
@@ -72,15 +134,31 @@ class SystemFramework(testutil.PollMixin):
         d.addCallback(_ready)
         return d
 
+    def record_initial_memusage(self):
+        print
+        print "Client started (no connections yet)"
+        d = self._print_usage()
+        d.addCallback(self.stash_stats, "init")
+        return d
+
+    def wait_for_client_connected(self):
+        print
+        print "Client connecting to other nodes.."
+        return self.control_rref.callRemote("wait_for_client_connections",
+                                            self.numnodes+1)
+
     def tearDown(self, passthrough):
         # the client node will shut down in a few seconds
-        #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
+        #os.remove(os.path.join(self.clientdir, client.Client.EXIT_TRIGGER_FILE))
         log.msg("shutting down SystemTest services")
+        if self.keepalive_file and os.path.exists(self.keepalive_file):
+            age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
+            log.msg("keepalive file at shutdown was %ds old" % age)
         d = defer.succeed(None)
         if self.proc:
             d.addCallback(lambda res: self.kill_client())
         d.addCallback(lambda res: self.sparent.stopService())
-        d.addCallback(lambda res: eventual.flushEventualQueue())
+        d.addCallback(lambda res: flushEventualQueue())
         def _close_statsfile(res):
             self.statsfile.close()
         d.addCallback(_close_statsfile)
@@ -91,41 +169,54 @@ class SystemFramework(testutil.PollMixin):
         s.setServiceParent(self.sparent)
         return s
 
-    def make_introducer_and_vdrive(self):
-        iv_basedir = os.path.join(self.basedir, "introducer_and_vdrive")
+    def make_introducer(self):
+        iv_basedir = os.path.join(self.testdir, "introducer")
         os.mkdir(iv_basedir)
-        iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
-        self.introducer_and_vdrive = self.add_service(iv)
-        d = self.introducer_and_vdrive.when_tub_ready()
+        iv = introducer.IntroducerNode(basedir=iv_basedir)
+        self.introducer = self.add_service(iv)
+        d = self.introducer.when_tub_ready()
+        def _introducer_ready(res):
+            q = self.introducer
+            self.introducer_furl = q.introducer_url
+        d.addCallback(_introducer_ready)
         return d
 
     def make_nodes(self):
-        q = self.introducer_and_vdrive
-        self.introducer_furl = q.urls["introducer"]
-        self.vdrive_furl = q.urls["vdrive"]
         self.nodes = []
         for i in range(self.numnodes):
-            nodedir = os.path.join(self.basedir, "node%d" % i)
+            nodedir = os.path.join(self.testdir, "node%d" % i)
             os.mkdir(nodedir)
-            f = open(os.path.join(nodedir, "introducer.furl"), "w")
-            f.write(self.introducer_furl)
-            f.close()
-            f = open(os.path.join(nodedir, "vdrive.furl"), "w")
-            f.write(self.vdrive_furl)
+            f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
+            f.write("[client]\n"
+                    "introducer.furl = %s\n"
+                    "shares.happy = 1\n"
+                    "[storage]\n"
+                    % (self.introducer_furl,))
+            # the only tests for which we want the internal nodes to actually
+            # retain shares are the ones where somebody's going to download
+            # them.
+            if self.mode in ("download", "download-GET", "download-GET-slow"):
+                # retain shares
+                pass
+            else:
+                # for these tests, we tell the storage servers to pretend to
+                # accept shares, but really just throw them out, since we're
+                # only testing upload and not download.
+                f.write("debug_discard = true\n")
+            if self.mode in ("receive",):
+                # for this mode, the client-under-test gets all the shares,
+                # so our internal nodes can refuse requests
+                f.write("readonly = true\n")
             f.close()
-            if self.discard_shares:
-                # for this test, we tell the storage servers to throw out all
-                # their stored data, since we're only testing upload and not
-                # download.
-                f = open(os.path.join(nodedir, "debug_no_storage"), "w")
-                f.write("no_storage\n")
-                f.close()
             c = self.add_service(client.Client(basedir=nodedir))
             self.nodes.append(c)
         # the peers will start running, eventually they will connect to each
-        # other and the introducer_and_vdrive
+        # other and the introducer
 
     def touch_keepalive(self):
+        if os.path.exists(self.keepalive_file):
+            age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
+            log.msg("touching keepalive file, was %ds old" % age)
         f = open(self.keepalive_file, "w")
         f.write("""\
 If the node notices this file at startup, it will poll every 5 seconds and
@@ -139,46 +230,59 @@ this file are ignored.
     def start_client(self):
         # this returns a Deferred that fires with the client's control.furl
         log.msg("MAKING CLIENT")
-        clientdir = self.clientdir = os.path.join(self.basedir, "client")
+        # self.testdir is an absolute Unicode path
+        clientdir = self.clientdir = os.path.join(self.testdir, u"client")
+        clientdir_str = clientdir.encode(get_filesystem_encoding())
         quiet = StringIO()
-        create_node.create_client(clientdir, {}, out=quiet)
+        create_node.create_node({'basedir': clientdir}, out=quiet)
         log.msg("DONE MAKING CLIENT")
-        f = open(os.path.join(clientdir, "introducer.furl"), "w")
-        f.write(self.introducer_furl + "\n")
-        f.close()
-        f = open(os.path.join(clientdir, "vdrive.furl"), "w")
-        f.write(self.vdrive_furl + "\n")
-        f.close()
-        f = open(os.path.join(clientdir, "webport"), "w")
-        # TODO: ideally we would set webport=0 and then ask the node what
-        # port it picked. But at the moment it is not convenient to do this,
-        # so we just pick a relatively unique one.
-        webport = max(os.getpid(), 2000)
-        f.write("tcp:%d:interface=127.0.0.1\n" % webport)
+        # now replace tahoe.cfg
+        # set webport=0 and then ask the node what port it picked.
+        f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
+        f.write("[node]\n"
+                "web.port = tcp:0:interface=127.0.0.1\n"
+                "[client]\n"
+                "introducer.furl = %s\n"
+                "shares.happy = 1\n"
+                "[storage]\n"
+                % (self.introducer_furl,))
+
+        if self.mode in ("upload-self", "receive"):
+            # accept and store shares, to trigger the memory consumption bugs
+            pass
+        else:
+            # don't accept any shares
+            f.write("readonly = true\n")
+            ## also, if we do receive any shares, throw them away
+            #f.write("debug_discard = true")
+        if self.mode == "upload-self":
+            pass
         f.close()
-        self.webish_url = "http://localhost:%d" % webport
-        if self.discard_shares:
-            f = open(os.path.join(clientdir, "debug_no_storage"), "w")
-            f.write("no_storage\n")
-            f.close()
         self.keepalive_file = os.path.join(clientdir,
-                                           "suicide_prevention_hotline")
+                                           client.Client.EXIT_TRIGGER_FILE)
         # now start updating the mtime.
         self.touch_keepalive()
-        ts = internet.TimerService(4.0, self.touch_keepalive)
+        ts = internet.TimerService(1.0, self.touch_keepalive)
         ts.setServiceParent(self.sparent)
 
         pp = ClientWatcher()
         self.proc_done = pp.d = defer.Deferred()
-        cmd = ["twistd", "-y", "client.tac"]
+        logfile = os.path.join(self.basedir, "client.log")
+        cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
         env = os.environ.copy()
-        self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
+        self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
         log.msg("CLIENT STARTED")
 
         # now we wait for the client to get started. we're looking for the
         # control.furl file to appear.
-        furl_file = os.path.join(clientdir, "control.furl")
+        furl_file = os.path.join(clientdir, "private", "control.furl")
+        url_file = os.path.join(clientdir, "node.url")
         def _check():
+            if pp.ended and pp.ended.value.status != 0:
+                # the twistd process ends normally (with rc=0) if the child
+                # is successfully launched. It ends abnormally (with rc!=0)
+                # if the child cannot be launched.
+                raise ChildDidNotStartError("process ended while waiting for startup")
             return os.path.exists(furl_file)
         d = self.poll(_check, 0.1)
         # once it exists, wait a moment before we read from it, just in case
@@ -192,6 +296,11 @@ this file are ignored.
             return d2
         d.addCallback(_stall)
         def _read(res):
+            # read the node's URL
+            self.webish_url = open(url_file, "r").read().strip()
+            if self.webish_url[-1] == "/":
+                # trim trailing slash, since the rest of the code wants it gone
+                self.webish_url = self.webish_url[:-1]
             f = open(furl_file, "r")
             furl = f.read()
             return furl.strip()
@@ -203,14 +312,14 @@ this file are ignored.
         # returns a Deferred that fires when the process exits. This may only
         # be called once.
         try:
-            self.proc.signalProcess("KILL")
+            self.proc.signalProcess("INT")
         except error.ProcessExitedAlready:
             pass
         return self.proc_done
 
 
     def create_data(self, name, size):
-        filename = os.path.join(self.basedir, name + ".data")
+        filename = os.path.join(self.testdir, name + ".data")
         f = open(filename, "wb")
         block = "a" * 8192
         while size > 0:
@@ -221,6 +330,7 @@ this file are ignored.
 
     def stash_stats(self, stats, name):
         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
+        self.statsfile.flush()
         self.stats[name] = stats['VmPeak']
 
     def POST(self, urlpath, **fields):
@@ -247,8 +357,88 @@ this file are ignored.
         body = "\r\n".join(form) + "\r\n"
         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
                    }
-        return getPage(url, method="POST", postdata=body,
-                       headers=headers, followRedirect=False)
+        return tw_client.getPage(url, method="POST", postdata=body,
+                                 headers=headers, followRedirect=False)
+
+    def GET_discard(self, urlpath, stall):
+        url = self.webish_url + urlpath + "?filename=dummy-get.out"
+        return discardPage(url, stall)
+
+    def _print_usage(self, res=None):
+        d = self.control_rref.callRemote("get_memory_usage")
+        def _print(stats):
+            print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
+                                                stats["VmPeak"])
+            return stats
+        d.addCallback(_print)
+        return d
+
+    def _do_upload(self, res, size, files, uris):
+        name = '%d' % size
+        print
+        print "uploading %s" % name
+        if self.mode in ("upload", "upload-self"):
+            files[name] = self.create_data(name, size)
+            d = self.control_rref.callRemote("upload_from_file_to_uri",
+                                             files[name].encode("utf-8"),
+                                             convergence="check-memory")
+            def _done(uri):
+                os.remove(files[name])
+                del files[name]
+                return uri
+            d.addCallback(_done)
+        elif self.mode == "upload-POST":
+            data = "a" * size
+            url = "/uri"
+            d = self.POST(url, t="upload", file=("%d.data" % size, data))
+        elif self.mode in ("receive",
+                           "download", "download-GET", "download-GET-slow"):
+            # mode=receive: upload the data from a local peer, so that the
+            # client-under-test receives and stores the shares
+            #
+            # mode=download*: upload the data from a local peer, then have
+            # the client-under-test download it.
+            #
+            # we need to wait until the uploading node has connected to all
+            # peers, since the wait_for_client_connections() above doesn't
+            # pay attention to our self.nodes[] and their connections.
+            files[name] = self.create_data(name, size)
+            u = self.nodes[0].getServiceNamed("uploader")
+            d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
+            d.addCallback(lambda res:
+                          u.upload(upload.FileName(files[name],
+                                                   convergence="check-memory")))
+            d.addCallback(lambda results: results.get_uri())
+        else:
+            raise ValueError("unknown mode=%s" % self.mode)
+        def _complete(uri):
+            uris[name] = uri
+            print "uploaded %s" % name
+        d.addCallback(_complete)
+        return d
+
+    def _do_download(self, res, size, uris):
+        if self.mode not in ("download", "download-GET", "download-GET-slow"):
+            return
+        name = '%d' % size
+        print "downloading %s" % name
+        uri = uris[name]
+
+        if self.mode == "download":
+            d = self.control_rref.callRemote("download_from_uri_to_file",
+                                             uri, "dummy.out")
+        elif self.mode == "download-GET":
+            url = "/uri/%s" % uri
+            d = self.GET_discard(urllib.quote(url), stall=False)
+        elif self.mode == "download-GET-slow":
+            url = "/uri/%s" % uri
+            d = self.GET_discard(urllib.quote(url), stall=True)
+
+        def _complete(res):
+            print "downloaded %s" % name
+            return res
+        d.addCallback(_complete)
+        return d
 
     def do_test(self):
         #print "CLIENT STARTED"
@@ -258,63 +448,33 @@ this file are ignored.
         kB = 1000; MB = 1000*1000
         files = {}
         uris = {}
-        control = self.control_rref
-
-        def _print_usage(res=None):
-            d = control.callRemote("get_memory_usage")
-            def _print(stats):
-                print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
-                                                    stats["VmPeak"])
-                return stats
-            d.addCallback(_print)
-            return d
-
-        def _do_upload(res, size):
-            name = '%d' % size
-            print
-            print "uploading %s" % name
-            if self.mode == "upload":
-                files[name] = self.create_data(name, size)
-                d = control.callRemote("upload_from_file_to_uri", files[name])
-                def _done(uri):
-                    os.remove(files[name])
-                    del files[name]
-                    return uri
-                d.addCallback(_done)
-            elif self.mode == "upload-POST":
-                data = "a" * size
-                url = "/vdrive/global"
-                d = self.POST(url, t="upload", file=("%d.data" % size, data))
-            else:
-                raise RuntimeError("unknown mode=%s" % self.mode)
-            def _complete(uri):
-                uris[name] = uri
-                print "uploaded %s" % name
-            d.addCallback(_complete)
-            return d
-
-        d = _print_usage()
+
+        d = self._print_usage()
         d.addCallback(self.stash_stats, "0B")
 
         for i in range(10):
-            d.addCallback(_do_upload, size=10*kB+i)
-            d.addCallback(_print_usage)
+            d.addCallback(self._do_upload, 10*kB+i, files, uris)
+            d.addCallback(self._do_download, 10*kB+i, uris)
+            d.addCallback(self._print_usage)
         d.addCallback(self.stash_stats, "10kB")
 
         for i in range(3):
-            d.addCallback(_do_upload, size=10*MB+i)
-            d.addCallback(_print_usage)
+            d.addCallback(self._do_upload, 10*MB+i, files, uris)
+            d.addCallback(self._do_download, 10*MB+i, uris)
+            d.addCallback(self._print_usage)
         d.addCallback(self.stash_stats, "10MB")
 
-        for i in range(3):
-            d.addCallback(_do_upload, size=50*MB+i)
-            d.addCallback(_print_usage)
+        for i in range(1):
+            d.addCallback(self._do_upload, 50*MB+i, files, uris)
+            d.addCallback(self._do_download, 50*MB+i, uris)
+            d.addCallback(self._print_usage)
         d.addCallback(self.stash_stats, "50MB")
 
-        for i in range(1):
-            d.addCallback(_do_upload, size=100*MB+i)
-            d.addCallback(_print_usage)
-        d.addCallback(self.stash_stats, "100MB")
+        #for i in range(1):
+        #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
+        #    d.addCallback(self._do_download, 100*MB+i, uris)
+        #    d.addCallback(self._print_usage)
+        #d.addCallback(self.stash_stats, "100MB")
 
         #d.addCallback(self.stall)
         def _done(res):
@@ -329,11 +489,13 @@ this file are ignored.
 
 
 class ClientWatcher(protocol.ProcessProtocol):
+    ended = False
     def outReceived(self, data):
         print "OUT:", data
     def errReceived(self, data):
         print "ERR:", data
     def processEnded(self, reason):
+        self.ended = reason
         self.d.callback(None)
 
 
@@ -341,6 +503,16 @@ if __name__ == '__main__':
     mode = "upload"
     if len(sys.argv) > 1:
         mode = sys.argv[1]
+    if sys.maxint == 2147483647:
+        bits = "32"
+    elif sys.maxint == 9223372036854775807:
+        bits = "64"
+    else:
+        bits = "?"
+    print "%s-bit system (sys.maxint=%d)" % (bits, sys.maxint)
+    # put the logfile and stats.out in _test_memory/ . These stick around.
+    # put the nodes and other files in _test_memory/test/ . These are
+    # removed each time we run.
     sf = SystemFramework("_test_memory", mode)
     sf.run()