-#! /usr/bin/env python
-
-import os, shutil
-
+import os, shutil, sys, urllib, time, stat, urlparse
+from cStringIO import StringIO
from twisted.internet import defer, reactor, protocol, error
-from twisted.application import service
-from allmydata import client, introducer_and_vdrive
-from allmydata.scripts import runner
-from foolscap.eventual import eventually, flushEventualQueue
+from twisted.application import service, internet
+from twisted.web import client as tw_client
+from allmydata import client, introducer
+from allmydata.immutable import upload
+from allmydata.scripts import create_node
+from allmydata.util import fileutil, pollmixin
+from allmydata.util.fileutil import abspath_expanduser_unicode
+from allmydata.util.encodingutil import get_filesystem_encoding
+from foolscap.api import Tub, fireEventually, flushEventualQueue
from twisted.python import log
-class SystemFramework:
- numnodes = 5
+class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
+ full_speed_ahead = False
+ _bytes_so_far = 0
+ stalled = None
+ def handleResponsePart(self, data):
+ self._bytes_so_far += len(data)
+ if not self.factory.do_stall:
+ return
+ if self.full_speed_ahead:
+ return
+ if self._bytes_so_far > 1e6+100:
+ if not self.stalled:
+ print "STALLING"
+ self.transport.pauseProducing()
+ self.stalled = reactor.callLater(10.0, self._resume_speed)
+ def _resume_speed(self):
+ print "RESUME SPEED"
+ self.stalled = None
+ self.full_speed_ahead = True
+ self.transport.resumeProducing()
+ def handleResponseEnd(self):
+ if self.stalled:
+ print "CANCEL"
+ self.stalled.cancel()
+ self.stalled = None
+ return tw_client.HTTPPageGetter.handleResponseEnd(self)
+
+class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
+ protocol = StallableHTTPGetterDiscarder
+
+def discardPage(url, stall=False, *args, **kwargs):
+ """Start fetching the URL, but stall our pipe after the first 1MB.
+ Wait 10 seconds, then resume downloading (and discarding) everything.
+ """
+ # adapted from twisted.web.client.getPage . We can't just wrap or
+ # subclass because it provides no way to override the HTTPClientFactory
+ # that it creates.
+ scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
+ assert scheme == 'http'
+ host, port = netloc, 80
+ if ":" in host:
+ host, port = host.split(":")
+ port = int(port)
+ factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
+ factory.do_stall = stall
+ reactor.connectTCP(host, port, factory)
+ return factory.deferred
- def __init__(self, basedir):
- self.basedir = basedir = os.path.abspath(basedir)
- if not basedir.startswith(os.path.abspath(".")):
+class ChildDidNotStartError(Exception):
+ pass
+
+class SystemFramework(pollmixin.PollMixin):
+ numnodes = 7
+
+ def __init__(self, basedir, mode):
+ self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
+ if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
raise AssertionError("safety issue: basedir must be a subdir")
- if os.path.exists(basedir):
- shutil.rmtree(basedir)
- os.mkdir(basedir)
+ self.testdir = testdir = os.path.join(basedir, "test")
+ if os.path.exists(testdir):
+ shutil.rmtree(testdir)
+ fileutil.make_dirs(testdir)
self.sparent = service.MultiService()
self.sparent.startService()
+ self.proc = None
+ self.tub = Tub()
+ self.tub.setOption("expose-remote-exception-types", False)
+ self.tub.setServiceParent(self.sparent)
+ self.mode = mode
+ self.failed = False
+ self.keepalive_file = None
def run(self):
- log.startLogging(open(os.path.join(self.basedir, "log"), "w"))
- d = defer.Deferred()
- eventually(d.callback, None)
- d.addCallback(lambda res: self.start())
- d.addErrback(log.err)
+ framelog = os.path.join(self.basedir, "driver.log")
+ log.startLogging(open(framelog, "a"), setStdout=False)
+ log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
+ #logfile = open(os.path.join(self.testdir, "log"), "w")
+ #flo = log.FileLogObserver(logfile)
+ #log.startLoggingWithObserver(flo.emit, setStdout=False)
+ d = fireEventually()
+ d.addCallback(lambda res: self.setUp())
+ d.addCallback(lambda res: self.record_initial_memusage())
+ d.addCallback(lambda res: self.make_nodes())
+ d.addCallback(lambda res: self.wait_for_client_connected())
+ d.addCallback(lambda res: self.do_test())
+ d.addBoth(self.tearDown)
+ def _err(err):
+ self.failed = err
+ log.err(err)
+ print err
+ d.addErrback(_err)
+ def _done(res):
+ reactor.stop()
+ return res
+ d.addBoth(_done)
reactor.run()
+ if self.failed:
+ # raiseException doesn't work for CopiedFailures
+ self.failed.raiseException()
- def start(self):
- print "STARTING"
- d = self.make_introducer_and_vdrive()
+ def setUp(self):
+ #print "STARTING"
+ self.stats = {}
+ self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
+ d = self.make_introducer()
def _more(res):
- self.make_nodes()
- self.start_client()
+ return self.start_client()
d.addCallback(_more)
+ def _record_control_furl(control_furl):
+ self.control_furl = control_furl
+ #print "OBTAINING '%s'" % (control_furl,)
+ return self.tub.getReference(self.control_furl)
+ d.addCallback(_record_control_furl)
+ def _record_control(control_rref):
+ self.control_rref = control_rref
+ d.addCallback(_record_control)
+ def _ready(res):
+ #print "CLIENT READY"
+ pass
+ d.addCallback(_ready)
+ return d
+
+ def record_initial_memusage(self):
+ print
+ print "Client started (no connections yet)"
+ d = self._print_usage()
+ d.addCallback(self.stash_stats, "init")
return d
- def tearDown(self):
- os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
+ def wait_for_client_connected(self):
+ print
+ print "Client connecting to other nodes.."
+ return self.control_rref.callRemote("wait_for_client_connections",
+ self.numnodes+1)
+
+ def tearDown(self, passthrough):
# the client node will shut down in a few seconds
+ #os.remove(os.path.join(self.clientdir, client.Client.EXIT_TRIGGER_FILE))
log.msg("shutting down SystemTest services")
- d = self.sparent.stopService()
+ if self.keepalive_file and os.path.exists(self.keepalive_file):
+ age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
+ log.msg("keepalive file at shutdown was %ds old" % age)
+ d = defer.succeed(None)
+ if self.proc:
+ d.addCallback(lambda res: self.kill_client())
+ d.addCallback(lambda res: self.sparent.stopService())
d.addCallback(lambda res: flushEventualQueue())
- def _done(res):
- d1 = defer.Deferred()
- reactor.callLater(self.DISCONNECT_DELAY, d1.callback, None)
- return d1
- d.addCallback(_done)
+ def _close_statsfile(res):
+ self.statsfile.close()
+ d.addCallback(_close_statsfile)
+ d.addCallback(lambda res: passthrough)
return d
def add_service(self, s):
s.setServiceParent(self.sparent)
return s
- def make_introducer_and_vdrive(self):
- introducer_and_vdrive_dir = os.path.join(self.basedir, "introducer_and_vdrive")
- os.mkdir(introducer_and_vdrive_dir)
- self.introducer_and_vdrive = self.add_service(introducer_and_vdrive.IntroducerAndVdrive(basedir=introducer_and_vdrive_dir))
- d = self.introducer_and_vdrive.when_tub_ready()
+ def make_introducer(self):
+ iv_basedir = os.path.join(self.testdir, "introducer")
+ os.mkdir(iv_basedir)
+ iv = introducer.IntroducerNode(basedir=iv_basedir)
+ self.introducer = self.add_service(iv)
+ d = self.introducer.when_tub_ready()
+ def _introducer_ready(res):
+ q = self.introducer
+ self.introducer_furl = q.introducer_url
+ d.addCallback(_introducer_ready)
return d
def make_nodes(self):
- q = self.introducer_and_vdrive
- self.introducer_furl = q.urls["introducer"]
- vdrive_furl = q.urls["vdrive"]
self.nodes = []
for i in range(self.numnodes):
- nodedir = os.path.join(self.basedir, "node%d" % i)
+ nodedir = os.path.join(self.testdir, "node%d" % i)
os.mkdir(nodedir)
- f = open(os.path.join(nodedir, "introducer.furl"), "w")
- f.write(self.introducer_furl)
- f.close()
- f = open(os.path.join(nodedir, "vdrive.furl"), "w")
- f.write(vdrive_furl)
+ f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
+ f.write("[client]\n"
+ "introducer.furl = %s\n"
+ "shares.happy = 1\n"
+ "[storage]\n"
+ % (self.introducer_furl,))
+ # the only tests for which we want the internal nodes to actually
+ # retain shares are the ones where somebody's going to download
+ # them.
+ if self.mode in ("download", "download-GET", "download-GET-slow"):
+ # retain shares
+ pass
+ else:
+ # for these tests, we tell the storage servers to pretend to
+ # accept shares, but really just throw them out, since we're
+ # only testing upload and not download.
+ f.write("debug_discard = true\n")
+ if self.mode in ("receive",):
+ # for this mode, the client-under-test gets all the shares,
+ # so our internal nodes can refuse requests
+ f.write("readonly = true\n")
f.close()
c = self.add_service(client.Client(basedir=nodedir))
self.nodes.append(c)
# the peers will start running, eventually they will connect to each
- # other and the introducer_and_vdrive
+ # other and the introducer
def touch_keepalive(self):
+ if os.path.exists(self.keepalive_file):
+ age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
+ log.msg("touching keepalive file, was %ds old" % age)
f = open(self.keepalive_file, "w")
- f.write("If the node notices this file at startup, it will poll and\n")
- f.write("terminate as soon as the file goes away. This prevents\n")
- f.write("leaving processes around if the test harness has an\n")
- f.write("internal failure and neglects to kill off the node\n")
- f.write("itself. The contents of this file are ignored.\n")
+ f.write("""\
+If the node notices this file at startup, it will poll every 5 seconds and
+terminate if the file is more than 10 seconds old, or if it has been deleted.
+If the test harness has an internal failure and neglects to kill off the node
+itself, this helps to avoid leaving processes lying around. The contents of
+this file are ignored.
+ """)
f.close()
def start_client(self):
+ # this returns a Deferred that fires with the client's control.furl
log.msg("MAKING CLIENT")
- clientdir = self.clientdir = os.path.join(self.basedir, "client")
- config = {'basedir': clientdir}
- runner.create_client(config)
+ # self.testdir is an absolute Unicode path
+ clientdir = self.clientdir = os.path.join(self.testdir, u"client")
+ clientdir_str = clientdir.encode(get_filesystem_encoding())
+ quiet = StringIO()
+ create_node.create_node({'basedir': clientdir}, out=quiet)
log.msg("DONE MAKING CLIENT")
- f = open(os.path.join(clientdir, "introducer.furl"), "w")
- f.write(self.introducer_furl + "\n")
+ # now replace tahoe.cfg
+ # set webport=0 and then ask the node what port it picked.
+ f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
+ f.write("[node]\n"
+ "web.port = tcp:0:interface=127.0.0.1\n"
+ "[client]\n"
+ "introducer.furl = %s\n"
+ "shares.happy = 1\n"
+ "[storage]\n"
+ % (self.introducer_furl,))
+
+ if self.mode in ("upload-self", "receive"):
+ # accept and store shares, to trigger the memory consumption bugs
+ pass
+ else:
+ # don't accept any shares
+ f.write("readonly = true\n")
+ ## also, if we do receive any shares, throw them away
+ #f.write("debug_discard = true")
+ if self.mode == "upload-self":
+ pass
f.close()
- self.keepalive_file = os.path.join(clientdir, "suicide_prevention_hotline")
- self.touch_keepalive()
+ self.keepalive_file = os.path.join(clientdir,
+ client.Client.EXIT_TRIGGER_FILE)
# now start updating the mtime.
+ self.touch_keepalive()
+ ts = internet.TimerService(1.0, self.touch_keepalive)
+ ts.setServiceParent(self.sparent)
pp = ClientWatcher()
- cmd = ["twistd", "-y", "client.tac"]
+ self.proc_done = pp.d = defer.Deferred()
+ logfile = os.path.join(self.basedir, "client.log")
+ cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
env = os.environ.copy()
- self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
+ self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
log.msg("CLIENT STARTED")
+ # now we wait for the client to get started. we're looking for the
+ # control.furl file to appear.
+ furl_file = os.path.join(clientdir, "private", "control.furl")
+ url_file = os.path.join(clientdir, "node.url")
+ def _check():
+ if pp.ended and pp.ended.value.status != 0:
+ # the twistd process ends normally (with rc=0) if the child
+ # is successfully launched. It ends abnormally (with rc!=0)
+ # if the child cannot be launched.
+ raise ChildDidNotStartError("process ended while waiting for startup")
+ return os.path.exists(furl_file)
+ d = self.poll(_check, 0.1)
+ # once it exists, wait a moment before we read from it, just in case
+ # it hasn't finished writing the whole thing. Ideally control.furl
+ # would be created in some atomic fashion, or made non-readable until
+ # it's ready, but I can't think of an easy way to do that, and I
+ # think the chances that we'll observe a half-write are pretty low.
+ def _stall(res):
+ d2 = defer.Deferred()
+ reactor.callLater(0.1, d2.callback, None)
+ return d2
+ d.addCallback(_stall)
+ def _read(res):
+ # read the node's URL
+ self.webish_url = open(url_file, "r").read().strip()
+ if self.webish_url[-1] == "/":
+ # trim trailing slash, since the rest of the code wants it gone
+ self.webish_url = self.webish_url[:-1]
+ f = open(furl_file, "r")
+ furl = f.read()
+ return furl.strip()
+ d.addCallback(_read)
+ return d
+
+
def kill_client(self):
+ # returns a Deferred that fires when the process exits. This may only
+ # be called once.
try:
- self.proc.signalProcess("KILL")
+ self.proc.signalProcess("INT")
except error.ProcessExitedAlready:
pass
+ return self.proc_done
+
+
+ def create_data(self, name, size):
+ filename = os.path.join(self.testdir, name + ".data")
+ f = open(filename, "wb")
+ block = "a" * 8192
+ while size > 0:
+ l = min(size, 8192)
+ f.write(block[:l])
+ size -= l
+ return filename
+
+ def stash_stats(self, stats, name):
+ self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
+ self.statsfile.flush()
+ self.stats[name] = stats['VmPeak']
+
+ def POST(self, urlpath, **fields):
+ url = self.webish_url + urlpath
+ sepbase = "boogabooga"
+ sep = "--" + sepbase
+ form = []
+ form.append(sep)
+ form.append('Content-Disposition: form-data; name="_charset"')
+ form.append('')
+ form.append('UTF-8')
+ form.append(sep)
+ for name, value in fields.iteritems():
+ if isinstance(value, tuple):
+ filename, value = value
+ form.append('Content-Disposition: form-data; name="%s"; '
+ 'filename="%s"' % (name, filename))
+ else:
+ form.append('Content-Disposition: form-data; name="%s"' % name)
+ form.append('')
+ form.append(value)
+ form.append(sep)
+ form[-1] += "--"
+ body = "\r\n".join(form) + "\r\n"
+ headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
+ }
+ return tw_client.getPage(url, method="POST", postdata=body,
+ headers=headers, followRedirect=False)
+
+ def GET_discard(self, urlpath, stall):
+ url = self.webish_url + urlpath + "?filename=dummy-get.out"
+ return discardPage(url, stall)
+
+ def _print_usage(self, res=None):
+ d = self.control_rref.callRemote("get_memory_usage")
+ def _print(stats):
+ print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
+ stats["VmPeak"])
+ return stats
+ d.addCallback(_print)
+ return d
+
+ def _do_upload(self, res, size, files, uris):
+ name = '%d' % size
+ print
+ print "uploading %s" % name
+ if self.mode in ("upload", "upload-self"):
+ files[name] = self.create_data(name, size)
+ d = self.control_rref.callRemote("upload_from_file_to_uri",
+ files[name].encode("utf-8"),
+ convergence="check-memory")
+ def _done(uri):
+ os.remove(files[name])
+ del files[name]
+ return uri
+ d.addCallback(_done)
+ elif self.mode == "upload-POST":
+ data = "a" * size
+ url = "/uri"
+ d = self.POST(url, t="upload", file=("%d.data" % size, data))
+ elif self.mode in ("receive",
+ "download", "download-GET", "download-GET-slow"):
+ # mode=receive: upload the data from a local peer, so that the
+ # client-under-test receives and stores the shares
+ #
+ # mode=download*: upload the data from a local peer, then have
+ # the client-under-test download it.
+ #
+ # we need to wait until the uploading node has connected to all
+ # peers, since the wait_for_client_connections() above doesn't
+ # pay attention to our self.nodes[] and their connections.
+ files[name] = self.create_data(name, size)
+ u = self.nodes[0].getServiceNamed("uploader")
+ d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
+ d.addCallback(lambda res:
+ u.upload(upload.FileName(files[name],
+ convergence="check-memory")))
+ d.addCallback(lambda results: results.get_uri())
+ else:
+ raise ValueError("unknown mode=%s" % self.mode)
+ def _complete(uri):
+ uris[name] = uri
+ print "uploaded %s" % name
+ d.addCallback(_complete)
+ return d
+
+ def _do_download(self, res, size, uris):
+ if self.mode not in ("download", "download-GET", "download-GET-slow"):
+ return
+ name = '%d' % size
+ print "downloading %s" % name
+ uri = uris[name]
+
+ if self.mode == "download":
+ d = self.control_rref.callRemote("download_from_uri_to_file",
+ uri, "dummy.out")
+ elif self.mode == "download-GET":
+ url = "/uri/%s" % uri
+ d = self.GET_discard(urllib.quote(url), stall=False)
+ elif self.mode == "download-GET-slow":
+ url = "/uri/%s" % uri
+ d = self.GET_discard(urllib.quote(url), stall=True)
+
+ def _complete(res):
+ print "downloaded %s" % name
+ return res
+ d.addCallback(_complete)
+ return d
+
+ def do_test(self):
+ #print "CLIENT STARTED"
+ #print "FURL", self.control_furl
+ #print "RREF", self.control_rref
+ #print
+ kB = 1000; MB = 1000*1000
+ files = {}
+ uris = {}
+
+ d = self._print_usage()
+ d.addCallback(self.stash_stats, "0B")
+
+ for i in range(10):
+ d.addCallback(self._do_upload, 10*kB+i, files, uris)
+ d.addCallback(self._do_download, 10*kB+i, uris)
+ d.addCallback(self._print_usage)
+ d.addCallback(self.stash_stats, "10kB")
+
+ for i in range(3):
+ d.addCallback(self._do_upload, 10*MB+i, files, uris)
+ d.addCallback(self._do_download, 10*MB+i, uris)
+ d.addCallback(self._print_usage)
+ d.addCallback(self.stash_stats, "10MB")
+
+ for i in range(1):
+ d.addCallback(self._do_upload, 50*MB+i, files, uris)
+ d.addCallback(self._do_download, 50*MB+i, uris)
+ d.addCallback(self._print_usage)
+ d.addCallback(self.stash_stats, "50MB")
+
+ #for i in range(1):
+ # d.addCallback(self._do_upload, 100*MB+i, files, uris)
+ # d.addCallback(self._do_download, 100*MB+i, uris)
+ # d.addCallback(self._print_usage)
+ #d.addCallback(self.stash_stats, "100MB")
+
+ #d.addCallback(self.stall)
+ def _done(res):
+ print "FINISHING"
+ d.addCallback(_done)
+ return d
+
+ def stall(self, res):
+ d = defer.Deferred()
+ reactor.callLater(5, d.callback, None)
+ return d
class ClientWatcher(protocol.ProcessProtocol):
+ ended = False
def outReceived(self, data):
print "OUT:", data
def errReceived(self, data):
print "ERR:", data
+ def processEnded(self, reason):
+ self.ended = reason
+ self.d.callback(None)
if __name__ == '__main__':
- sf = SystemFramework("_test_memory")
+ mode = "upload"
+ if len(sys.argv) > 1:
+ mode = sys.argv[1]
+ if sys.maxint == 2147483647:
+ bits = "32"
+ elif sys.maxint == 9223372036854775807:
+ bits = "64"
+ else:
+ bits = "?"
+ print "%s-bit system (sys.maxint=%d)" % (bits, sys.maxint)
+ # put the logfile and stats.out in _test_memory/ . These stick around.
+ # put the nodes and other files in _test_memory/test/ . These are
+ # removed each time we run.
+ sf = SystemFramework("_test_memory", mode)
sf.run()
-
-# add a config option that looks for a keepalive file, and if it disappears,
-# shut down the node.