-#! /usr/bin/env python
-
-import os, shutil, sys, urllib
+import os, shutil, sys, urllib, time, stat, urlparse
from cStringIO import StringIO
from twisted.internet import defer, reactor, protocol, error
from twisted.application import service, internet
from twisted.web import client as tw_client
-from allmydata import client, introducer_and_vdrive
+from allmydata import client, introducer
+from allmydata.immutable import upload
from allmydata.scripts import create_node
-from allmydata.util import testutil, fileutil
-import foolscap
-from foolscap import eventual
+from allmydata.util import fileutil, pollmixin
+from allmydata.util.fileutil import abspath_expanduser_unicode
+from allmydata.util.encodingutil import get_filesystem_encoding
+from foolscap.api import Tub, fireEventually, flushEventualQueue
from twisted.python import log
class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
# adapted from twisted.web.client.getPage . We can't just wrap or
# subclass because it provides no way to override the HTTPClientFactory
# that it creates.
- scheme, host, port, path = tw_client._parse(url)
+ scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
+ assert scheme == 'http'
+ host, port = netloc, 80
+ if ":" in host:
+ host, port = host.split(":")
+ port = int(port)
factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
factory.do_stall = stall
- assert scheme == 'http'
reactor.connectTCP(host, port, factory)
return factory.deferred
-class SystemFramework(testutil.PollMixin):
- numnodes = 5
+class ChildDidNotStartError(Exception):
+ pass
+
+class SystemFramework(pollmixin.PollMixin):
+ numnodes = 7
def __init__(self, basedir, mode):
- self.basedir = basedir = os.path.abspath(basedir)
- if not basedir.startswith(os.path.abspath(".")):
+ self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
+ if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
raise AssertionError("safety issue: basedir must be a subdir")
self.testdir = testdir = os.path.join(basedir, "test")
if os.path.exists(testdir):
self.sparent = service.MultiService()
self.sparent.startService()
self.proc = None
- self.tub = foolscap.Tub()
+ self.tub = Tub()
+ self.tub.setOption("expose-remote-exception-types", False)
self.tub.setServiceParent(self.sparent)
self.mode = mode
self.failed = False
+ self.keepalive_file = None
def run(self):
- log.startLogging(open(os.path.join(self.testdir, "log"), "w"),
- setStdout=False)
+ framelog = os.path.join(self.basedir, "driver.log")
+ log.startLogging(open(framelog, "a"), setStdout=False)
+ log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
#logfile = open(os.path.join(self.testdir, "log"), "w")
#flo = log.FileLogObserver(logfile)
#log.startLoggingWithObserver(flo.emit, setStdout=False)
- d = eventual.fireEventually()
+ d = fireEventually()
d.addCallback(lambda res: self.setUp())
d.addCallback(lambda res: self.record_initial_memusage())
d.addCallback(lambda res: self.make_nodes())
d.addBoth(_done)
reactor.run()
if self.failed:
+ # raiseException doesn't work for CopiedFailures
self.failed.raiseException()
def setUp(self):
#print "STARTING"
self.stats = {}
self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
- d = self.make_introducer_and_vdrive()
+ d = self.make_introducer()
def _more(res):
return self.start_client()
d.addCallback(_more)
def tearDown(self, passthrough):
# the client node will shut down in a few seconds
- #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
+ #os.remove(os.path.join(self.clientdir, client.Client.EXIT_TRIGGER_FILE))
log.msg("shutting down SystemTest services")
+ if self.keepalive_file and os.path.exists(self.keepalive_file):
+ age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
+ log.msg("keepalive file at shutdown was %ds old" % age)
d = defer.succeed(None)
if self.proc:
d.addCallback(lambda res: self.kill_client())
d.addCallback(lambda res: self.sparent.stopService())
- d.addCallback(lambda res: eventual.flushEventualQueue())
+ d.addCallback(lambda res: flushEventualQueue())
def _close_statsfile(res):
self.statsfile.close()
d.addCallback(_close_statsfile)
s.setServiceParent(self.sparent)
return s
- def make_introducer_and_vdrive(self):
- iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
+ def make_introducer(self):
+ iv_basedir = os.path.join(self.testdir, "introducer")
os.mkdir(iv_basedir)
- iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
- self.introducer_and_vdrive = self.add_service(iv)
- d = self.introducer_and_vdrive.when_tub_ready()
+ iv = introducer.IntroducerNode(basedir=iv_basedir)
+ self.introducer = self.add_service(iv)
+ d = self.introducer.when_tub_ready()
def _introducer_ready(res):
- q = self.introducer_and_vdrive
- self.introducer_furl = q.urls["introducer"]
- self.vdrive_furl = q.urls["vdrive"]
+ q = self.introducer
+ self.introducer_furl = q.introducer_url
d.addCallback(_introducer_ready)
return d
for i in range(self.numnodes):
nodedir = os.path.join(self.testdir, "node%d" % i)
os.mkdir(nodedir)
- f = open(os.path.join(nodedir, "introducer.furl"), "w")
- f.write(self.introducer_furl)
- f.close()
- f = open(os.path.join(nodedir, "vdrive.furl"), "w")
- f.write(self.vdrive_furl)
- f.close()
+ f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
+ f.write("[client]\n"
+ "introducer.furl = %s\n"
+ "shares.happy = 1\n"
+ "[storage]\n"
+ % (self.introducer_furl,))
# the only tests for which we want the internal nodes to actually
# retain shares are the ones where somebody's going to download
# them.
# for these tests, we tell the storage servers to pretend to
# accept shares, but really just throw them out, since we're
# only testing upload and not download.
- f = open(os.path.join(nodedir, "debug_no_storage"), "w")
- f.write("no_storage\n")
- f.close()
+ f.write("debug_discard = true\n")
if self.mode in ("receive",):
# for this mode, the client-under-test gets all the shares,
# so our internal nodes can refuse requests
- f = open(os.path.join(nodedir, "sizelimit"), "w")
- f.write("0\n")
- f.close()
+ f.write("readonly = true\n")
+ f.close()
c = self.add_service(client.Client(basedir=nodedir))
self.nodes.append(c)
# the peers will start running, eventually they will connect to each
- # other and the introducer_and_vdrive
+ # other and the introducer
def touch_keepalive(self):
+ if os.path.exists(self.keepalive_file):
+ age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
+ log.msg("touching keepalive file, was %ds old" % age)
f = open(self.keepalive_file, "w")
f.write("""\
If the node notices this file at startup, it will poll every 5 seconds and
def start_client(self):
# this returns a Deferred that fires with the client's control.furl
log.msg("MAKING CLIENT")
- clientdir = self.clientdir = os.path.join(self.testdir, "client")
+ # self.testdir is an absolute Unicode path
+ clientdir = self.clientdir = os.path.join(self.testdir, u"client")
+ clientdir_str = clientdir.encode(get_filesystem_encoding())
quiet = StringIO()
- create_node.create_client(clientdir, {}, out=quiet)
+ create_node.create_node({'basedir': clientdir}, out=quiet)
log.msg("DONE MAKING CLIENT")
- f = open(os.path.join(clientdir, "introducer.furl"), "w")
- f.write(self.introducer_furl + "\n")
- f.close()
- f = open(os.path.join(clientdir, "vdrive.furl"), "w")
- f.write(self.vdrive_furl + "\n")
- f.close()
- f = open(os.path.join(clientdir, "webport"), "w")
- # TODO: ideally we would set webport=0 and then ask the node what
- # port it picked. But at the moment it is not convenient to do this,
- # so we just pick a relatively unique one.
- webport = max(os.getpid(), 2000)
- f.write("tcp:%d:interface=127.0.0.1\n" % webport)
- f.close()
- self.webish_url = "http://localhost:%d" % webport
+ # now replace tahoe.cfg
+ # set webport=0 and then ask the node what port it picked.
+ f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
+ f.write("[node]\n"
+ "web.port = tcp:0:interface=127.0.0.1\n"
+ "[client]\n"
+ "introducer.furl = %s\n"
+ "shares.happy = 1\n"
+ "[storage]\n"
+ % (self.introducer_furl,))
+
if self.mode in ("upload-self", "receive"):
# accept and store shares, to trigger the memory consumption bugs
pass
else:
# don't accept any shares
- f = open(os.path.join(clientdir, "sizelimit"), "w")
- f.write("0\n")
- f.close()
+ f.write("readonly = true\n")
## also, if we do receive any shares, throw them away
- #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
- #f.write("no_storage\n")
- #f.close()
+ #f.write("debug_discard = true")
if self.mode == "upload-self":
- f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
- f.write("push_to_ourselves\n")
- f.close()
+ pass
+ f.close()
self.keepalive_file = os.path.join(clientdir,
- "suicide_prevention_hotline")
+ client.Client.EXIT_TRIGGER_FILE)
# now start updating the mtime.
self.touch_keepalive()
- ts = internet.TimerService(4.0, self.touch_keepalive)
+ ts = internet.TimerService(1.0, self.touch_keepalive)
ts.setServiceParent(self.sparent)
pp = ClientWatcher()
self.proc_done = pp.d = defer.Deferred()
logfile = os.path.join(self.basedir, "client.log")
- cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
+ cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
env = os.environ.copy()
- self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
+ self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
log.msg("CLIENT STARTED")
# now we wait for the client to get started. we're looking for the
# control.furl file to appear.
- furl_file = os.path.join(clientdir, "control.furl")
+ furl_file = os.path.join(clientdir, "private", "control.furl")
+ url_file = os.path.join(clientdir, "node.url")
def _check():
if pp.ended and pp.ended.value.status != 0:
# the twistd process ends normally (with rc=0) if the child
# is successfully launched. It ends abnormally (with rc!=0)
# if the child cannot be launched.
- raise RuntimeError("process ended while waiting for startup")
+ raise ChildDidNotStartError("process ended while waiting for startup")
return os.path.exists(furl_file)
d = self.poll(_check, 0.1)
# once it exists, wait a moment before we read from it, just in case
return d2
d.addCallback(_stall)
def _read(res):
+ # read the node's URL
+ self.webish_url = open(url_file, "r").read().strip()
+ if self.webish_url[-1] == "/":
+ # trim trailing slash, since the rest of the code wants it gone
+ self.webish_url = self.webish_url[:-1]
f = open(furl_file, "r")
furl = f.read()
return furl.strip()
print
print "uploading %s" % name
if self.mode in ("upload", "upload-self"):
- files[name] = self.create_data(name, size)
- d = self.control_rref.callRemote("upload_from_file_to_uri",
- files[name])
- def _done(uri):
- os.remove(files[name])
- del files[name]
- return uri
- d.addCallback(_done)
+ d = self.control_rref.callRemote("upload_random_data_from_file",
+ size,
+ convergence="check-memory")
elif self.mode == "upload-POST":
data = "a" * size
- url = "/vdrive/global"
+ url = "/uri"
d = self.POST(url, t="upload", file=("%d.data" % size, data))
elif self.mode in ("receive",
"download", "download-GET", "download-GET-slow"):
files[name] = self.create_data(name, size)
u = self.nodes[0].getServiceNamed("uploader")
d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
- d.addCallback(lambda res: u.upload_filename(files[name]))
+ d.addCallback(lambda res:
+ u.upload(upload.FileName(files[name],
+ convergence="check-memory")))
+ d.addCallback(lambda results: results.get_uri())
else:
- raise RuntimeError("unknown mode=%s" % self.mode)
+ raise ValueError("unknown mode=%s" % self.mode)
def _complete(uri):
uris[name] = uri
print "uploaded %s" % name
uri = uris[name]
if self.mode == "download":
- d = self.control_rref.callRemote("download_from_uri_to_file",
- uri, "dummy.out")
+ d = self.control_rref.callRemote("download_to_tempfile_and_delete",
+ uri)
elif self.mode == "download-GET":
url = "/uri/%s" % uri
d = self.GET_discard(urllib.quote(url), stall=False)
mode = "upload"
if len(sys.argv) > 1:
mode = sys.argv[1]
+ if sys.maxint == 2147483647:
+ bits = "32"
+ elif sys.maxint == 9223372036854775807:
+ bits = "64"
+ else:
+ bits = "?"
+ print "%s-bit system (sys.maxint=%d)" % (bits, sys.maxint)
# put the logfile and stats.out in _test_memory/ . These stick around.
# put the nodes and other files in _test_memory/test/ . These are
# removed each time we run.