# using the grid fs).
import sys, os, shutil, unittest, subprocess
-import tempfile, re, time, signal, random, httplib
-import traceback
+import tempfile, re, time, random, httplib
+
+from twisted.python import usage
+
+if sys.platform.startswith('darwin'):
+ UNMOUNT_CMD = ['umount']
+else:
+ # linux, and until we hear otherwise, all other platforms with fuse, by assumption
+ UNMOUNT_CMD = ['fusermount', '-u']
# Import fuse implementations:
-FuseDir = os.path.join('.', 'contrib', 'fuse')
-if not os.path.isdir(FuseDir):
- raise SystemExit('''
-Could not find directory "%s". Please run this script from the tahoe
-source base directory.
-''' % (FuseDir,))
+#FuseDir = os.path.join('.', 'contrib', 'fuse')
+#if not os.path.isdir(FuseDir):
+# raise SystemExit('''
+#Could not find directory "%s". Please run this script from the tahoe
+#source base directory.
+#''' % (FuseDir,))
+FuseDir = '.'
+
+### Load each implementation
sys.path.append(os.path.join(FuseDir, 'impl_a'))
import tahoe_fuse as impl_a
-
sys.path.append(os.path.join(FuseDir, 'impl_b'))
import pyfuse.tahoe as impl_b
+### config info about each impl, including which make sense to run
+implementations = {
+ 'impl_a': dict(module=impl_a,
+ mount_args=['--basedir', '%(nodedir)s', '%(mountpath)s', ],
+ mount_wait=True,
+ tests=['read', ]),
+ 'impl_b': dict(module=impl_b,
+ mount_args=['--basedir', '%(nodedir)s', '%(mountpath)s', ],
+ mount_wait=False,
+ tests=['read', ]),
+ }
+
+#if sys.platform == 'darwin':
+ #del implementations['impl_a']
+ #del implementations['impl_b']
+
+class FuseTestsOptions(usage.Options):
+ optParameters = [
+ ["test-type", None, "both",
+ "Type of test to run; unit, system or both"
+ ],
+ ["implementations", None, "all",
+ "Comma separated list of implementations to test, or 'all'"
+ ],
+ ["tests", None, "all",
+ "Comma separated list of test sets to run, or 'all'"
+ ],
+ ["path-to-tahoe", None, "../../bin/tahoe",
+ "Which 'tahoe' script to use to create test nodes"],
+ ["tmp-dir", None, "/tmp",
+ "Where the test should create temporary files"],
+ # Note; this is '/tmp' because on leopard, tempfile.mkdtemp creates
+ # directories in a location which leads paths to exceed what macfuse
+ # can handle without leaking un-umount-able fuse processes.
+ ]
+ optFlags = [
+ ["debug-wait", None,
+ "Causes the test system to pause at various points, to facilitate debugging"],
+ ]
+
+ def postOptions(self):
+ if self['tests'] == 'all':
+ self.tests = ['read', 'write']
+ # [ ] todo: deduce this from looking for test_ in dir(self)
+ else:
+ self.tests = map(str.strip, self['tests'].split(','))
+ if self['implementations'] == 'all':
+ self.implementations = implementations.keys()
+ else:
+ self.implementations = map(str.strip, self['implementations'].split(','))
### Main flow control:
-def main(args = sys.argv):
+def main(args):
+ config = FuseTestsOptions()
+ config.parseOptions(args[1:])
+
target = 'all'
if len(args) > 1:
target = args.pop(1)
- if target not in ('all', 'unit', 'system'):
- raise SystemExit(Usage)
-
- if target in ('all', 'unit'):
- run_unit_tests()
+ test_type = config['test-type']
+ if test_type not in ('both', 'unit', 'system'):
+ raise usage.error('test-type %r not supported' % (test_type,))
- if target in ('all', 'system'):
- run_system_test()
+ if test_type in ('both', 'unit'):
+ run_unit_tests([args[0]])
+ if test_type in ('both', 'system'):
+ run_system_test(config)
-def run_unit_tests():
+
+def run_unit_tests(argv):
print 'Running Unit Tests.'
try:
- unittest.main()
+ unittest.main(argv=argv)
except SystemExit, se:
pass
print 'Unit Tests complete.\n'
-
-def run_system_test():
- SystemTest().run()
+
+def run_system_test(config):
+ SystemTest(config).run()
### System Testing:
class SystemTest (object):
- def __init__(self):
+ def __init__(self, config):
+ self.config = config
+
# These members represent configuration:
self.fullcleanup = False # FIXME: Make this a commandline option.
-
+
# These members represent test state:
self.cliexec = None
self.testroot = None
# These "*_layer" methods call eachother in a linear fashion, using
# exception unwinding to do cleanup properly. Each "layer" invokes
# a deeper layer, and each layer does its own cleanup upon exit.
-
+
def run(self, fullcleanup = False):
'''
If full_cleanup, delete all temporary state.
print sfail
print '\n*** System Tests were not successfully completed.'
+ def maybe_wait(self, msg='waiting'):
+ if self.config['debug-wait']:
+ print msg
+ raw_input()
+
def init_cli_layer(self):
'''This layer finds the appropriate tahoe executable.'''
- self.cliexec = os.path.join('.', 'bin', 'tahoe')
+ #self.cliexec = os.path.join('.', 'bin', 'tahoe')
+ self.cliexec = self.config['path-to-tahoe']
version = self.run_tahoe('--version')
print 'Using %r with version:\n%s' % (self.cliexec, version.rstrip())
def create_testroot_layer(self):
print 'Creating test base directory.'
- self.testroot = tempfile.mkdtemp(prefix='tahoe_fuse_test_')
+ #self.testroot = tempfile.mkdtemp(prefix='tahoe_fuse_test_')
+ #self.testroot = tempfile.mkdtemp(prefix='tahoe_fuse_test_', dir='/tmp/')
+ tmpdir = self.config['tmp-dir']
+ if tmpdir:
+ self.testroot = tempfile.mkdtemp(prefix='tahoe_fuse_test_', dir=tmpdir)
+ else:
+ self.testroot = tempfile.mkdtemp(prefix='tahoe_fuse_test_')
try:
return self.launch_introducer_layer()
finally:
else:
print 'Leaving test root directory: %r' % (self.testroot, )
-
+
def launch_introducer_layer(self):
print 'Launching introducer.'
introbase = os.path.join(self.testroot, 'introducer')
self.check_tahoe_output(startoutput, ExpectedStartOutput, introbase)
return self.launch_clients_layer(introbase)
-
+
finally:
print 'Stopping introducer node.'
self.stop_node(introbase)
-
+
TotalClientsNeeded = 3
def launch_clients_layer(self, introbase, clientnum = 0):
if clientnum >= self.TotalClientsNeeded:
- return self.create_test_dirnode_layer()
+ self.maybe_wait('waiting (launched clients)')
+ ret = self.create_test_dirnode_layer()
+ self.maybe_wait('waiting (ran tests)')
+ return ret
tmpl = 'Launching client %d of %d.'
print tmpl % (clientnum,
f = open(webportpath, 'w')
f.write('tcp:%d:interface=127.0.0.1\n' % self.port)
f.close()
+ print "http://127.0.0.1:%d/" % (self.port,)
else:
os.remove(webportpath)
-
introfurl = os.path.join(introbase, 'introducer.furl')
finally:
print 'Stopping client node %d.' % (clientnum,)
self.stop_node(base)
-
+
def create_test_dirnode_layer(self):
print 'Creating test dirnode.'
f.close()
return self.mount_fuse_layer(cap)
-
- def mount_fuse_layer(self, fusebasecap):
+
+ def mount_fuse_layer(self, root_uri):
mpbase = os.path.join(self.testroot, 'mountpoint')
os.mkdir(mpbase)
-
results = []
- # Mount and test each implementation:
- for implnum, implmanklass in enumerate([Impl_A_ProcessManager, Impl_B_ProcessManager]):
- implman = implmanklass(self.clientbase, mpbase)
- print '\n*** Testing impl #%d: %r' % (implnum, implman.Name)
-
- implman.setup()
-
+ if self.config['debug-wait']:
+ ImplProcessManager.debug_wait = True
+
+ #for name, kwargs in implementations.items():
+ for name in self.config.implementations:
+ kwargs = implementations[name]
+ #print 'instantiating %s: %r' % (name, kwargs)
+ implprocmgr = ImplProcessManager(name, **kwargs)
+ print '\n*** Testing impl: %r' % (implprocmgr.name)
+ implprocmgr.configure(self.clientbase, mpbase)
+ implprocmgr.mount()
try:
- failures, total = self.run_test_layer(fusebasecap, implman)
- result = (implman.Name, failures, total)
+ failures, total = self.run_test_layer(root_uri, implprocmgr)
+ result = (implprocmgr.name, failures, total)
tmpl = '\n*** Test Results implementation %s: %d failed out of %d.'
print tmpl % result
results.append(result)
-
finally:
- implman.cleanup()
-
+ implprocmgr.umount()
return results
- def run_test_layer(self, fbcap, iman):
- testnames = [n for n in sorted(dir(self)) if n.startswith('test_')]
-
+ def run_test_layer(self, root_uri, iman):
failures = 0
- for testnum, testname in enumerate(testnames):
- print '\n*** Running test #%d: %s' % (testnum, testname)
- try:
- testcap = self.create_dirnode()
- dirname = '%s_%s' % (iman.Name, testname)
- self.attach_node(fbcap, testcap, dirname)
-
- method = getattr(self, testname)
- method(testcap, testdir = os.path.join(iman.mountpath, dirname))
- print 'Test succeeded.'
- except TestFailure, f:
- print f
- failures += 1
- except:
- print 'Error in test code... Cleaning up.'
- raise
-
- return (failures, len(testnames))
-
+ testnum = 0
+ numtests = 0
+ tests = list(set(self.config.tests).intersection(set(iman.tests)))
+ self.maybe_wait('waiting (about to run tests)')
+ for test in tests:
+ print 'running %r tests' % (test,)
+ testnames = [n for n in sorted(dir(self)) if n.startswith('test_'+test+'_')]
+ numtests += len(testnames)
+ print 'found methods:', testnames
+ for testname in testnames:
+ testnum += 1
+ print '\n*** Running test #%d: %s' % (testnum, testname)
+ try:
+ testcap = self.create_dirnode()
+ dirname = '%s_%s' % (iman.name, testname)
+ self.attach_node(root_uri, testcap, dirname)
+ method = getattr(self, testname)
+ method(testcap, testdir = os.path.join(iman.mountpath, dirname))
+ print 'Test succeeded.'
+ except TestFailure, f:
+ print f
+ failures += 1
+ except:
+ print 'Error in test code... Cleaning up.'
+ raise
+ return (failures, numtests)
# Tests:
- def test_directory_existence(self, testcap, testdir):
+ def test_read_directory_existence(self, testcap, testdir):
if not wrap_os_error(os.path.isdir, testdir):
raise TestFailure('Attached test directory not found: %r', testdir)
-
- def test_empty_directory_listing(self, testcap, testdir):
+
+ def test_read_empty_directory_listing(self, testcap, testdir):
listing = wrap_os_error(os.listdir, testdir)
if listing:
raise TestFailure('Expected empty directory, found: %r', listing)
-
- def test_directory_listing(self, testcap, testdir):
+
+ def test_read_directory_listing(self, testcap, testdir):
names = []
filesizes = {}
names.append(fname)
body = 'Hello World #%d!' % (i,)
filesizes[fname] = len(body)
-
+
cap = self.webapi_call('PUT', '/uri', body)
self.attach_node(testcap, cap, fname)
self.attach_node(testcap, cap, dname)
names.sort()
-
+
listing = wrap_os_error(os.listdir, testdir)
listing.sort()
if st.st_size != size:
tmpl = 'Expected %r size of %r but fuse returned %r'
raise TestFailure(tmpl, file, size, st.st_size)
-
- def test_file_contents(self, testcap, testdir):
+
+ def test_read_file_contents(self, testcap, testdir):
name = 'hw.txt'
body = 'Hello World!'
-
+
cap = self.webapi_call('PUT', '/uri', body)
self.attach_node(testcap, cap, name)
if found != body:
tmpl = 'Expected file contents %r but found %r'
raise TestFailure(tmpl, body, found)
-
-
+
+
# Utilities:
def run_tahoe(self, *args):
realargs = ('tahoe',) + args
status,
output)
return output
-
+
def check_tahoe_output(self, output, expected, expdir):
+ ignorable_lines = map(re.compile, [
+ '.*site-packages/zope\.interface.*\.egg/zope/__init__.py:3: UserWarning: Module twisted was already imported from .*egg is being added to sys.path',
+ ' import pkg_resources',
+ ])
+ def ignore_line(line):
+ for ignorable_line in ignorable_lines:
+ if ignorable_line.match(line):
+ return True
+ else:
+ return False
+ output = '\n'.join( [ line
+ for line in output.split('\n')+['']
+ #if line not in ignorable_lines ] )
+ if not ignore_line(line) ] )
m = re.match(expected, output, re.M)
if m is None:
tmpl = 'The output of tahoe did not match the expectation:\n'
def webapi_call(self, method, path, body=None, **options):
if options:
path = path + '?' + ('&'.join(['%s=%s' % kv for kv in options.items()]))
-
+
conn = httplib.HTTPConnection('127.0.0.1', self.port)
conn.request(method, path, body = body)
resp = conn.getresponse()
resp.status, body)
return resp.read()
-
+
def create_dirnode(self):
return self.webapi_call('PUT', '/uri', t='mkdir').strip()
body = childcap,
t = 'uri',
replace = 'false')
- assert body.strip() == childcap, `status, dircap, childcap, childname`
+ assert body.strip() == childcap, `body, dircap, childcap, childname`
def polling_operation(self, operation, polldesc, timeout = 10.0, pollinterval = 0.2):
totaltime = timeout # Fudging for edge-case SetupFailure description...
-
+
totalattempts = int(timeout / pollinterval)
starttime = time.time()
elif totaltime > timeout:
break
-
+
else:
opdelay = time.time() - opstart
realinterval = max(0., pollinterval - opdelay)
-
+
#tmpl = '(Poll attempt %d failed after %.2f seconds, sleeping %.2f seconds.)'
#print tmpl % (attempt+1, opdelay, realinterval)
time.sleep(realinterval)
-
+
tmpl = 'Timeout while polling for: %s\n'
tmpl += 'Waited %.2f seconds (%d polls).'
raise SetupFailure(tmpl, polldesc, totaltime, attempt+1)
def __init__(self, tmpl, *args):
msg = self.Prefix + (tmpl % args)
Exception.__init__(self, msg)
-
+
class SetupFailure (Failure):
Prefix = 'Setup Failure - The test framework encountered an error:\n'
class TestFailure (Failure):
Prefix = 'TestFailure: '
-
+
### Unit Tests:
class Impl_A_UnitTests (unittest.TestCase):
for input, output in iopairs:
result = impl_a.canonicalize_cap(input)
self.failUnlessEqual(output, result, 'input == %r' % (input,))
-
+
### Misc:
-class ImplProcessManager (object):
- '''Subclasses must have Name and Mod class attributes.'''
- def __init__(self, clientbase, mpbase):
- self.clientbase = clientbase
- self.mountpath = os.path.join(mpbase, self.Name)
- self.script = self.Mod.__file__
+class ImplProcessManager(object):
+ debug_wait = False
+
+ def __init__(self, name, module, mount_args, mount_wait, tests):
+ self.name = name
+ self.module = module
+ self.script = module.__file__
+ self.mount_args = mount_args
+ self.mount_wait = mount_wait
+ self.tests = tests
+
+ def maybe_wait(self, msg='waiting'):
+ if self.debug_wait:
+ print msg
+ raw_input()
+
+ def configure(self, client_nodedir, mountpoint):
+ self.client_nodedir = client_nodedir
+ self.mountpath = os.path.join(mountpoint, self.name)
os.mkdir(self.mountpath)
-
-class Impl_A_ProcessManager (ImplProcessManager):
- Name = 'impl_a'
- Mod = impl_a
-
- def setup(self):
- print 'Mounting implementation: %s' % (self.Name,)
- exitcode, output = gather_output(['python',
- self.script,
- self.mountpath,
- '--basedir', self.clientbase])
-
- if exitcode != 0 or output:
- tmpl = '%r failed to launch:\n'
- tmpl += 'Exit Status: %r\n'
- tmpl += 'Output:\n%s\n'
- raise SetupFailure(tmpl, implpath, exitcode, output)
-
- def cleanup(self):
- print 'Unmounting implementation: %s' % (self.Name,)
- args = ['fusermount', '-u', self.mountpath]
+ def mount(self):
+ print 'Mounting implementation: %s (%s)' % (self.name, self.script)
+
+ rootdirfile = os.path.join(self.client_nodedir, 'private', 'root_dir.cap')
+ root_uri = file(rootdirfile, 'r').read().strip()
+ fields = {'mountpath': self.mountpath,
+ 'nodedir': self.client_nodedir,
+ 'root-uri': root_uri,
+ }
+ args = ['python', self.script] + [ arg%fields for arg in self.mount_args ]
+ print ' '.join(args)
+ self.maybe_wait('waiting (about to launch fuse)')
+
+ if self.mount_wait:
+ exitcode, output = gather_output(args)
+ if exitcode != 0 or output:
+ tmpl = '%r failed to launch:\n'
+ tmpl += 'Exit Status: %r\n'
+ tmpl += 'Output:\n%s\n'
+ raise SetupFailure(tmpl, self.script, exitcode, output)
+ else:
+ self.proc = subprocess.Popen(args)
+
+ def umount(self):
+ print 'Unmounting implementation: %s' % (self.name,)
+ args = UNMOUNT_CMD + [self.mountpath]
+ print args
+ self.maybe_wait('waiting (unmount)')
+ #print os.system('ls -l '+self.mountpath)
ec, out = gather_output(args)
if ec != 0 or out:
- tmpl = 'fusermount failed to unmount:\n'
+ tmpl = '%r failed to unmount:\n' % (' '.join(UNMOUNT_CMD),)
tmpl += 'Arguments: %r\n'
tmpl += 'Exit Status: %r\n'
tmpl += 'Output:\n%s\n'
raise SetupFailure(tmpl, args, ec, out)
-
-class Impl_B_ProcessManager (ImplProcessManager):
- Name = 'impl_b'
- Mod = impl_b
-
- def setup(self):
- print 'Mounting implementation: %s' % (self.Name,)
- self.proc = subprocess.Popen(['python',
- self.script,
- self.mountpath,
- '--basedir', self.clientbase])
-
- def cleanup(self):
- print 'Unmounting implementation: %s' % (self.Name,)
- args = ['fusermount', '-u', self.mountpath]
- ec, out = gather_output(args)
- if ec != 0 or out:
- tmpl = 'fusermount failed to unmount:\n'
- tmpl += 'Arguments: %r\n'
- tmpl += 'Exit Status: %r\n'
- tmpl += 'Output:\n%s\n'
- raise SetupFailure(tmpl, args, ec, out)
-
def gather_output(*args, **kwargs):
'''
This expects the child does not require input and that it closes
output = p.stdout.read()
exitcode = p.wait()
return (exitcode, output)
-
+
def wrap_os_error(meth, *args):
try:
ExpectedStartOutput = r'STARTING (?P<path>.*?)\n(introducer|client) node probably started'
-Usage = '''
-Usage: %s [target]
-
-Run tests for the given target.
-
-target is one of: unit, system, or all
-''' % (sys.argv[0],)
-
-
-
if __name__ == '__main__':
- main()
+ main(sys.argv)