from base64 import b32encode
-import os
+import os, sys
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
d.addCallback(self.log, "did _check_publish_private")
d.addCallback(self._test_web)
d.addCallback(self._test_web_start)
- d.addCallback(self._test_runner)
+ d.addCallback(self._test_control)
return d
test_vdrive.timeout = 1100
):
self.failUnless("%s: " % key in output, key)
+ def _test_control(self, res):
+ # exercise the remote-control-the-client foolscap interfaces in
+ # allmydata.control (mostly used for performance tests)
+ c0 = self.clients[0]
+ control_furl_file = os.path.join(c0.basedir, "control.furl")
+ control_furl = open(control_furl_file, "r").read().strip()
+ # it doesn't really matter which Tub we use to connect to the client,
+ # so let's just use our Introducer's
+ d = self.introducer_and_vdrive.tub.getReference(control_furl)
+ d.addCallback(self._test_control2, control_furl_file)
+ return d
+ def _test_control2(self, rref, filename):
+ d = rref.callRemote("upload_from_file_to_uri", filename)
+ downfile = os.path.join(self.basedir, "control.downfile")
+ d.addCallback(lambda uri:
+ rref.callRemote("download_from_uri_to_file",
+ uri, downfile))
+ def _check(res):
+ self.failUnlessEqual(res, downfile)
+ data = open(downfile, "r").read()
+ expected_data = open(filename, "r").read()
+ self.failUnlessEqual(data, expected_data)
+ d.addCallback(_check)
+ d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200))
+ if sys.platform == "linux2":
+ d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
+ return d
+