]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/check_memory.py
fix check-memory test, with new new (safe) control-port methods
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
index c62cfd84d6801d1c5935bb35e7c6695e305eea49..1b727017ffa9420c9f89c41db2e9fce82f617c80 100644 (file)
@@ -1,15 +1,15 @@
-#! /usr/bin/env python
-
-import os, shutil, sys, urllib
+import os, shutil, sys, urllib, time, stat, urlparse
 from cStringIO import StringIO
 from twisted.internet import defer, reactor, protocol, error
 from twisted.application import service, internet
 from twisted.web import client as tw_client
-from allmydata import client, introducer_and_vdrive
+from allmydata import client, introducer
+from allmydata.immutable import upload
 from allmydata.scripts import create_node
-from allmydata.util import testutil, fileutil
-import foolscap
-from foolscap import eventual
+from allmydata.util import fileutil, pollmixin
+from allmydata.util.fileutil import abspath_expanduser_unicode
+from allmydata.util.encodingutil import get_filesystem_encoding
+from foolscap.api import Tub, fireEventually, flushEventualQueue
 from twisted.python import log
 
 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
@@ -49,19 +49,26 @@ def discardPage(url, stall=False, *args, **kwargs):
     # adapted from twisted.web.client.getPage . We can't just wrap or
     # subclass because it provides no way to override the HTTPClientFactory
     # that it creates.
-    scheme, host, port, path = tw_client._parse(url)
+    scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
+    assert scheme == 'http'
+    host, port = netloc, 80
+    if ":" in host:
+        host, port = host.split(":")
+        port = int(port)
     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
     factory.do_stall = stall
-    assert scheme == 'http'
     reactor.connectTCP(host, port, factory)
     return factory.deferred
 
-class SystemFramework(testutil.PollMixin):
-    numnodes = 5
+class ChildDidNotStartError(Exception):
+    pass
+
+class SystemFramework(pollmixin.PollMixin):
+    numnodes = 7
 
     def __init__(self, basedir, mode):
-        self.basedir = basedir = os.path.abspath(basedir)
-        if not basedir.startswith(os.path.abspath(".")):
+        self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
+        if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
             raise AssertionError("safety issue: basedir must be a subdir")
         self.testdir = testdir = os.path.join(basedir, "test")
         if os.path.exists(testdir):
@@ -70,18 +77,21 @@ class SystemFramework(testutil.PollMixin):
         self.sparent = service.MultiService()
         self.sparent.startService()
         self.proc = None
-        self.tub = foolscap.Tub()
+        self.tub = Tub()
+        self.tub.setOption("expose-remote-exception-types", False)
         self.tub.setServiceParent(self.sparent)
         self.mode = mode
         self.failed = False
+        self.keepalive_file = None
 
     def run(self):
-        log.startLogging(open(os.path.join(self.testdir, "log"), "w"),
-                         setStdout=False)
+        framelog = os.path.join(self.basedir, "driver.log")
+        log.startLogging(open(framelog, "a"), setStdout=False)
+        log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
         #logfile = open(os.path.join(self.testdir, "log"), "w")
         #flo = log.FileLogObserver(logfile)
         #log.startLoggingWithObserver(flo.emit, setStdout=False)
-        d = eventual.fireEventually()
+        d = fireEventually()
         d.addCallback(lambda res: self.setUp())
         d.addCallback(lambda res: self.record_initial_memusage())
         d.addCallback(lambda res: self.make_nodes())
@@ -99,13 +109,14 @@ class SystemFramework(testutil.PollMixin):
         d.addBoth(_done)
         reactor.run()
         if self.failed:
+            # raiseException doesn't work for CopiedFailures
             self.failed.raiseException()
 
     def setUp(self):
         #print "STARTING"
         self.stats = {}
         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
-        d = self.make_introducer_and_vdrive()
+        d = self.make_introducer()
         def _more(res):
             return self.start_client()
         d.addCallback(_more)
@@ -138,13 +149,16 @@ class SystemFramework(testutil.PollMixin):
 
     def tearDown(self, passthrough):
         # the client node will shut down in a few seconds
-        #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
+        #os.remove(os.path.join(self.clientdir, client.Client.EXIT_TRIGGER_FILE))
         log.msg("shutting down SystemTest services")
+        if self.keepalive_file and os.path.exists(self.keepalive_file):
+            age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
+            log.msg("keepalive file at shutdown was %ds old" % age)
         d = defer.succeed(None)
         if self.proc:
             d.addCallback(lambda res: self.kill_client())
         d.addCallback(lambda res: self.sparent.stopService())
-        d.addCallback(lambda res: eventual.flushEventualQueue())
+        d.addCallback(lambda res: flushEventualQueue())
         def _close_statsfile(res):
             self.statsfile.close()
         d.addCallback(_close_statsfile)
@@ -155,16 +169,15 @@ class SystemFramework(testutil.PollMixin):
         s.setServiceParent(self.sparent)
         return s
 
-    def make_introducer_and_vdrive(self):
-        iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
+    def make_introducer(self):
+        iv_basedir = os.path.join(self.testdir, "introducer")
         os.mkdir(iv_basedir)
-        iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
-        self.introducer_and_vdrive = self.add_service(iv)
-        d = self.introducer_and_vdrive.when_tub_ready()
+        iv = introducer.IntroducerNode(basedir=iv_basedir)
+        self.introducer = self.add_service(iv)
+        d = self.introducer.when_tub_ready()
         def _introducer_ready(res):
-            q = self.introducer_and_vdrive
-            self.introducer_furl = q.urls["introducer"]
-            self.vdrive_furl = q.urls["vdrive"]
+            q = self.introducer
+            self.introducer_furl = q.introducer_url
         d.addCallback(_introducer_ready)
         return d
 
@@ -173,12 +186,12 @@ class SystemFramework(testutil.PollMixin):
         for i in range(self.numnodes):
             nodedir = os.path.join(self.testdir, "node%d" % i)
             os.mkdir(nodedir)
-            f = open(os.path.join(nodedir, "introducer.furl"), "w")
-            f.write(self.introducer_furl)
-            f.close()
-            f = open(os.path.join(nodedir, "vdrive.furl"), "w")
-            f.write(self.vdrive_furl)
-            f.close()
+            f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
+            f.write("[client]\n"
+                    "introducer.furl = %s\n"
+                    "shares.happy = 1\n"
+                    "[storage]\n"
+                    % (self.introducer_furl,))
             # the only tests for which we want the internal nodes to actually
             # retain shares are the ones where somebody's going to download
             # them.
@@ -189,21 +202,21 @@ class SystemFramework(testutil.PollMixin):
                 # for these tests, we tell the storage servers to pretend to
                 # accept shares, but really just throw them out, since we're
                 # only testing upload and not download.
-                f = open(os.path.join(nodedir, "debug_no_storage"), "w")
-                f.write("no_storage\n")
-                f.close()
+                f.write("debug_discard = true\n")
             if self.mode in ("receive",):
                 # for this mode, the client-under-test gets all the shares,
                 # so our internal nodes can refuse requests
-                f = open(os.path.join(nodedir, "sizelimit"), "w")
-                f.write("0\n")
-                f.close()
+                f.write("readonly = true\n")
+            f.close()
             c = self.add_service(client.Client(basedir=nodedir))
             self.nodes.append(c)
         # the peers will start running, eventually they will connect to each
-        # other and the introducer_and_vdrive
+        # other and the introducer
 
     def touch_keepalive(self):
+        if os.path.exists(self.keepalive_file):
+            age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
+            log.msg("touching keepalive file, was %ds old" % age)
         f = open(self.keepalive_file, "w")
         f.write("""\
 If the node notices this file at startup, it will poll every 5 seconds and
@@ -217,64 +230,59 @@ this file are ignored.
     def start_client(self):
         # this returns a Deferred that fires with the client's control.furl
         log.msg("MAKING CLIENT")
-        clientdir = self.clientdir = os.path.join(self.testdir, "client")
+        # self.testdir is an absolute Unicode path
+        clientdir = self.clientdir = os.path.join(self.testdir, u"client")
+        clientdir_str = clientdir.encode(get_filesystem_encoding())
         quiet = StringIO()
-        create_node.create_client(clientdir, {}, out=quiet)
+        create_node.create_node({'basedir': clientdir}, out=quiet)
         log.msg("DONE MAKING CLIENT")
-        f = open(os.path.join(clientdir, "introducer.furl"), "w")
-        f.write(self.introducer_furl + "\n")
-        f.close()
-        f = open(os.path.join(clientdir, "vdrive.furl"), "w")
-        f.write(self.vdrive_furl + "\n")
-        f.close()
-        f = open(os.path.join(clientdir, "webport"), "w")
-        # TODO: ideally we would set webport=0 and then ask the node what
-        # port it picked. But at the moment it is not convenient to do this,
-        # so we just pick a relatively unique one.
-        webport = max(os.getpid(), 2000)
-        f.write("tcp:%d:interface=127.0.0.1\n" % webport)
-        f.close()
-        self.webish_url = "http://localhost:%d" % webport
+        # now replace tahoe.cfg
+        # set webport=0 and then ask the node what port it picked.
+        f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
+        f.write("[node]\n"
+                "web.port = tcp:0:interface=127.0.0.1\n"
+                "[client]\n"
+                "introducer.furl = %s\n"
+                "shares.happy = 1\n"
+                "[storage]\n"
+                % (self.introducer_furl,))
+
         if self.mode in ("upload-self", "receive"):
             # accept and store shares, to trigger the memory consumption bugs
             pass
         else:
             # don't accept any shares
-            f = open(os.path.join(clientdir, "sizelimit"), "w")
-            f.write("0\n")
-            f.close()
+            f.write("readonly = true\n")
             ## also, if we do receive any shares, throw them away
-            #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
-            #f.write("no_storage\n")
-            #f.close()
+            #f.write("debug_discard = true")
         if self.mode == "upload-self":
-            f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
-            f.write("push_to_ourselves\n")
-            f.close()
+            pass
+        f.close()
         self.keepalive_file = os.path.join(clientdir,
-                                           "suicide_prevention_hotline")
+                                           client.Client.EXIT_TRIGGER_FILE)
         # now start updating the mtime.
         self.touch_keepalive()
-        ts = internet.TimerService(4.0, self.touch_keepalive)
+        ts = internet.TimerService(1.0, self.touch_keepalive)
         ts.setServiceParent(self.sparent)
 
         pp = ClientWatcher()
         self.proc_done = pp.d = defer.Deferred()
         logfile = os.path.join(self.basedir, "client.log")
-        cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
+        cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
         env = os.environ.copy()
-        self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
+        self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
         log.msg("CLIENT STARTED")
 
         # now we wait for the client to get started. we're looking for the
         # control.furl file to appear.
-        furl_file = os.path.join(clientdir, "control.furl")
+        furl_file = os.path.join(clientdir, "private", "control.furl")
+        url_file = os.path.join(clientdir, "node.url")
         def _check():
             if pp.ended and pp.ended.value.status != 0:
                 # the twistd process ends normally (with rc=0) if the child
                 # is successfully launched. It ends abnormally (with rc!=0)
                 # if the child cannot be launched.
-                raise RuntimeError("process ended while waiting for startup")
+                raise ChildDidNotStartError("process ended while waiting for startup")
             return os.path.exists(furl_file)
         d = self.poll(_check, 0.1)
         # once it exists, wait a moment before we read from it, just in case
@@ -288,6 +296,11 @@ this file are ignored.
             return d2
         d.addCallback(_stall)
         def _read(res):
+            # read the node's URL
+            self.webish_url = open(url_file, "r").read().strip()
+            if self.webish_url[-1] == "/":
+                # trim trailing slash, since the rest of the code wants it gone
+                self.webish_url = self.webish_url[:-1]
             f = open(furl_file, "r")
             furl = f.read()
             return furl.strip()
@@ -365,17 +378,12 @@ this file are ignored.
         print
         print "uploading %s" % name
         if self.mode in ("upload", "upload-self"):
-            files[name] = self.create_data(name, size)
-            d = self.control_rref.callRemote("upload_from_file_to_uri",
-                                             files[name])
-            def _done(uri):
-                os.remove(files[name])
-                del files[name]
-                return uri
-            d.addCallback(_done)
+            d = self.control_rref.callRemote("upload_random_data_from_file",
+                                             size,
+                                             convergence="check-memory")
         elif self.mode == "upload-POST":
             data = "a" * size
-            url = "/vdrive/global"
+            url = "/uri"
             d = self.POST(url, t="upload", file=("%d.data" % size, data))
         elif self.mode in ("receive",
                            "download", "download-GET", "download-GET-slow"):
@@ -391,9 +399,12 @@ this file are ignored.
             files[name] = self.create_data(name, size)
             u = self.nodes[0].getServiceNamed("uploader")
             d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
-            d.addCallback(lambda res: u.upload_filename(files[name]))
+            d.addCallback(lambda res:
+                          u.upload(upload.FileName(files[name],
+                                                   convergence="check-memory")))
+            d.addCallback(lambda results: results.get_uri())
         else:
-            raise RuntimeError("unknown mode=%s" % self.mode)
+            raise ValueError("unknown mode=%s" % self.mode)
         def _complete(uri):
             uris[name] = uri
             print "uploaded %s" % name
@@ -408,8 +419,8 @@ this file are ignored.
         uri = uris[name]
 
         if self.mode == "download":
-            d = self.control_rref.callRemote("download_from_uri_to_file",
-                                             uri, "dummy.out")
+            d = self.control_rref.callRemote("download_to_tempfile_and_delete",
+                                             uri)
         elif self.mode == "download-GET":
             url = "/uri/%s" % uri
             d = self.GET_discard(urllib.quote(url), stall=False)
@@ -486,6 +497,13 @@ if __name__ == '__main__':
     mode = "upload"
     if len(sys.argv) > 1:
         mode = sys.argv[1]
+    if sys.maxint == 2147483647:
+        bits = "32"
+    elif sys.maxint == 9223372036854775807:
+        bits = "64"
+    else:
+        bits = "?"
+    print "%s-bit system (sys.maxint=%d)" % (bits, sys.maxint)
     # put the logfile and stats.out in _test_memory/ . These stick around.
     # put the nodes and other files in _test_memory/test/ . These are
     # removed each time we run.