from twisted.application import service, internet
from allmydata import client, introducer_and_vdrive
from allmydata.scripts import runner
-from foolscap.eventual import eventually, flushEventualQueue
+from allmydata.util import testutil
+import foolscap
+from foolscap import eventual
from twisted.python import log
-class SystemFramework:
+class SystemFramework(testutil.PollMixin):
numnodes = 5
def __init__(self, basedir):
os.mkdir(basedir)
self.sparent = service.MultiService()
self.sparent.startService()
+ self.proc = None
+ self.tub = foolscap.Tub()
+ self.tub.setServiceParent(self.sparent)
def run(self):
- log.startLogging(open(os.path.join(self.basedir, "log"), "w"))
- d = defer.Deferred()
- eventually(d.callback, None)
- d.addCallback(lambda res: self.start())
- d.addErrback(log.err)
+ log.startLogging(open(os.path.join(self.basedir, "log"), "w"),
+ setStdout=False)
+ #logfile = open(os.path.join(self.basedir, "log"), "w")
+ #flo = log.FileLogObserver(logfile)
+ #log.startLoggingWithObserver(flo.emit, setStdout=False)
+ d = eventual.fireEventually()
+ d.addCallback(lambda res: self.setUp())
+ d.addCallback(lambda res: self.do_test())
+ d.addBoth(self.tearDown)
+ def _err(err):
+ log.err(err)
+ print err
+ d.addErrback(_err)
+ d.addBoth(lambda res: reactor.stop())
reactor.run()
- def start(self):
+ def setUp(self):
print "STARTING"
d = self.make_introducer_and_vdrive()
def _more(res):
self.make_nodes()
- self.start_client()
+ return self.start_client()
d.addCallback(_more)
+ def _record_control_furl(control_furl):
+ self.control_furl = control_furl
+ print "OBTAINING '%s'" % (control_furl,)
+ return self.tub.getReference(self.control_furl)
+ d.addCallback(_record_control_furl)
+ def _record_control(control_rref):
+ self.control_rref = control_rref
+ return control_rref.callRemote("wait_for_client_connections",
+ self.numnodes+1)
+ d.addCallback(_record_control)
+ def _ready(res):
+ print "CLIENT READY"
+ d.addCallback(_ready)
return d
- def tearDown(self):
- os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
+ def tearDown(self, passthrough):
# the client node will shut down in a few seconds
+ #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
log.msg("shutting down SystemTest services")
- d = self.sparent.stopService()
- d.addCallback(lambda res: flushEventualQueue())
- def _done(res):
- d1 = defer.Deferred()
- reactor.callLater(self.DISCONNECT_DELAY, d1.callback, None)
- return d1
- d.addCallback(_done)
+ d = defer.succeed(None)
+ if self.proc:
+ d.addCallback(lambda res: self.kill_client())
+ d.addCallback(lambda res: self.sparent.stopService())
+ d.addCallback(lambda res: eventual.flushEventualQueue())
+ d.addCallback(lambda res: passthrough)
return d
def add_service(self, s):
f.close()
def start_client(self):
+ # this returns a Deferred that fires with the client's control.furl
log.msg("MAKING CLIENT")
clientdir = self.clientdir = os.path.join(self.basedir, "client")
- config = {'basedir': clientdir, 'quiet': False}
+ config = {'basedir': clientdir, 'quiet': True}
runner.create_client(config)
log.msg("DONE MAKING CLIENT")
f = open(os.path.join(clientdir, "introducer.furl"), "w")
ts.setServiceParent(self.sparent)
pp = ClientWatcher()
+ self.proc_done = pp.d = defer.Deferred()
cmd = ["twistd", "-y", "client.tac"]
env = os.environ.copy()
self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
log.msg("CLIENT STARTED")
+ # now we wait for the client to get started. we're looking for the
+ # control.furl file to appear.
+ furl_file = os.path.join(clientdir, "control.furl")
+ def _check():
+ return os.path.exists(furl_file)
+ d = self.poll(_check, 0.1)
+ # once it exists, wait a moment before we read from it, just in case
+ # it hasn't finished writing the whole thing. Ideally control.furl
+ # would be created in some atomic fashion, or made non-readable until
+ # it's ready, but I can't think of an easy way to do that, and I
+ # think the chances that we'll observe a half-write are pretty low.
+ def _stall(res):
+ d2 = defer.Deferred()
+ reactor.callLater(0.1, d2.callback, None)
+ return d2
+ d.addCallback(_stall)
+ def _read(res):
+ f = open(furl_file, "r")
+ furl = f.read()
+ return furl.strip()
+ d.addCallback(_read)
+ return d
+
+
def kill_client(self):
+ # returns a Deferred that fires when the process exits. This may only
+ # be called once.
try:
self.proc.signalProcess("KILL")
except error.ProcessExitedAlready:
pass
+ return self.proc_done
+
+
+ def create_data(self, name, size):
+ filename = os.path.join(self.basedir, name + ".data")
+ f = open(filename, "wb")
+ block = "a" * 8192
+ while size > 0:
+ l = min(size, 8192)
+ f.write(block[:l])
+ size -= l
+ return filename
+
+ def do_test(self):
+ print "CLIENT STARTED"
+ print "FURL", self.control_furl
+ print "RREF", self.control_rref
+ print
+ kB = 1000; MB = 1000*1000
+ files = {}
+ uris = {}
+ control = self.control_rref
+
+ def _print_usage(res=None):
+ d = control.callRemote("get_memory_usage")
+ def _print(stats):
+ print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
+ stats["VmPeak"])
+ d.addCallback(_print)
+ return d
+
+ def _do_upload(res, size):
+ name = '%d' % size
+ files[name] = self.create_data(name, size)
+ d = control.callRemote("upload_from_file_to_uri", files[name])
+ def _done(uri):
+ uris[name] = uri
+ print "uploaded %s" % name
+ d.addCallback(_done)
+ return d
+
+ d = _print_usage()
+
+ for i in range(10):
+ d.addCallback(_do_upload, size=10*kB+i)
+ d.addCallback(_print_usage)
+
+ for i in range(10):
+ d.addCallback(_do_upload, size=10*MB+i)
+ d.addCallback(_print_usage)
+
+ #d.addCallback(self.stall)
+ def _done(res):
+ print "FINISHING"
+ d.addCallback(_done)
+ return d
+
+ def stall(self, res):
+ d = defer.Deferred()
+ reactor.callLater(5, d.callback, None)
+ return d
class ClientWatcher(protocol.ProcessProtocol):
print "OUT:", data
def errReceived(self, data):
print "ERR:", data
+ def processEnded(self, reason):
+ self.d.callback(None)
if __name__ == '__main__':
sf = SystemFramework("_test_memory")
sf.run()
-
-# add a config option that looks for a keepalive file, and if it disappears,
-# shut down the node.
from foolscap.eventual import flushEventualQueue
from twisted.application import service
from allmydata.introducer import IntroducerClient, Introducer
-from allmydata.util import idlib
+from allmydata.util import idlib, testutil
class MyNode(Referenceable):
pass
def log(self, msg):
pass
-class TestIntroducer(unittest.TestCase):
+class TestIntroducer(unittest.TestCase, testutil.PollMixin):
def setUp(self):
self.parent = LoggingMultiService()
self.parent.startService()
return d
- def poll(self, check_f, pollinterval=0.01):
- # Return a Deferred, then call check_f periodically until it returns
- # True, at which point the Deferred will fire.. If check_f raises an
- # exception, the Deferred will errback.
- d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
- return d
-
- def _poll(self, res, check_f, pollinterval):
- if check_f():
- return True
- d = defer.Deferred()
- d.addCallback(self._poll, check_f, pollinterval)
- reactor.callLater(pollinterval, d.callback, None)
- return d
-
-
def test_create(self):
ic = IntroducerClient(None, "introducer", "myfurl")
def _ignore(nodeid, rref):