From: zooko Date: Thu, 8 Nov 2007 22:26:00 +0000 (+0530) Subject: zfec: add unit tests for easyfec X-Git-Url: https://git.rkrishnan.org/pf/vdrive?a=commitdiff_plain;h=2580202fb58bf71030336b7cbbbd9e2d1dcaaf6b;p=tahoe-lafs%2Fzfec.git zfec: add unit tests for easyfec darcs-hash:a55ddf8d4f0fd974bd80f5744377a5217bf94b1f --- diff --git a/zfec/zfec/test/test_zfec.py b/zfec/zfec/test/test_zfec.py index 150b7d9..56ca276 100755 --- a/zfec/zfec/test/test_zfec.py +++ b/zfec/zfec/test/test_zfec.py @@ -9,6 +9,8 @@ VERBOSE=False import zfec +from pyutil import fileutil + from base64 import b32encode def ab(x): # debuggery if len(x) >= 3: @@ -20,6 +22,9 @@ def ab(x): # debuggery elif len(x) == 0: return "%s:%s" % (len(x), "--empty--",) +def randstr(n): + return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n))) + def _h(k, m, ss): encer = zfec.Encoder(k, m) nums_and_blocks = list(enumerate(encer.encode(ss))) @@ -33,9 +38,6 @@ def _h(k, m, ss): assert len(decoded) == len(ss), (len(decoded), len(ss),) assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),) -def randstr(n): - return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n))) - def _help_test_random(): m = random.randrange(1, 257) k = random.randrange(1, m+1) @@ -44,36 +46,89 @@ def _help_test_random(): _h(k, m, ss) def _help_test_random_with_l(l): - m = 83 - k = 19 + m = random.randrange(1, 257) + k = random.randrange(1, m+1) ss = [ randstr(l/k) for x in range(k) ] _h(k, m, ss) -class ZFec(unittest.TestCase): +def _h_easy(k, m, s): + encer = zfec.easyfec.Encoder(k, m) + nums_and_blocks = list(enumerate(encer.encode(s))) + assert isinstance(nums_and_blocks, list), nums_and_blocks + assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,) + nums_and_blocks = random.sample(nums_and_blocks, k) + blocks = [ x[1] for x in nums_and_blocks ] + nums = [ x[0] for x in nums_and_blocks ] + decer = zfec.easyfec.Decoder(k, m) + decodeds = decer.decode(blocks, nums) + assert len(decodeds) == len(s), (len(decodeds), len(s),) + assert decodeds == s, (decodeds, s,) + +def _help_test_random_easy(): + m = random.randrange(1, 257) + k = random.randrange(1, m+1) + l = random.randrange(0, 2**10) + s = randstr(l) + _h_easy(k, m, s) + +def _help_test_random_with_l_easy(l): + m = random.randrange(1, 257) + k = random.randrange(1, m+1) + s = randstr(l) + _h_easy(k, m, s) + +class ZFecTest(unittest.TestCase): + def test_small(self): + for i in range(16): + _help_test_random_with_l(i) + if VERBOSE: + print "%d randomized tests pass." % (i+1) + def test_random(self): for i in range(3): _help_test_random() if VERBOSE: print "%d randomized tests pass." % (i+1) - def test_bad_args_enc(self): - encer = zfec.Encoder(2, 4) + def test_bad_args_dec(self): + decer = zfec.Decoder(2, 4) + + try: + decer.decode(98) # first argument is not a sequence + except TypeError, e: + assert "First argument was not a sequence" in str(e), e + else: + raise "Should have gotten TypeError for wrong type of second argument." + try: - encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",]) + decer.decode(["a", "b", ], ["c", "d",]) except zfec.Error, e: assert "Precondition violation: second argument is required to contain int" in str(e), e else: raise "Should have gotten zfec.Error for wrong type of second argument." try: - encer.encode(["a", "b", ], 98) # not a sequence at all + decer.decode(["a", "b", ], 98) # not a sequence at all except TypeError, e: - assert "Second argument (optional) was not a sequence" in str(e), e + assert "Second argument was not a sequence" in str(e), e else: raise "Should have gotten TypeError for wrong type of second argument." +class EasyFecTest(unittest.TestCase): + def test_small(self): + for i in range(2**10): + _help_test_random_with_l_easy(i) + if VERBOSE: + print "%d randomized tests pass." % (i+1) + + def test_random(self): + for i in range(2**8): + _help_test_random_easy() + if VERBOSE: + print "%d randomized tests pass." % (i+1) + def test_bad_args_dec(self): - decer = zfec.Decoder(2, 4) + decer = zfec.easyfec.Decoder(2, 4) try: decer.decode(98, [0, 1]) # first argument is not a sequence @@ -83,14 +138,14 @@ class ZFec(unittest.TestCase): raise "Should have gotten TypeError for wrong type of second argument." try: - decer.decode(["a", "b", ], ["c", "d",]) + decer.decode("ab", ["c", "d",]) except zfec.Error, e: assert "Precondition violation: second argument is required to contain int" in str(e), e else: raise "Should have gotten zfec.Error for wrong type of second argument." try: - decer.decode(["a", "b", ], 98) # not a sequence at all + decer.decode("ab", 98) # not a sequence at all except TypeError, e: assert "Second argument was not a sequence" in str(e), e else: @@ -123,7 +178,7 @@ class FileFec(unittest.TestCase): fsize = len(teststr) - tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=True) + tempdir = fileutil.NamedTemporaryDirectory(cleanup=True) try: tempf = tempdir.file(TESTFNAME, 'w+b') tempf.write(teststr) @@ -190,15 +245,15 @@ class FileFec(unittest.TestCase): class Cmdline(unittest.TestCase): def test_basic(self, noisy=VERBOSE): - tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=True) + tempdir = fileutil.NamedTemporaryDirectory(cleanup=True) fo = tempdir.file("test.data", "w+b") fo.write("WHEHWHJEKWAHDLJAWDHWALKDHA") import sys realargv = sys.argv try: - DEFAULT_M=16 - DEFAULT_K=4 + DEFAULT_M=8 + DEFAULT_K=3 sys.argv = ["zfec", os.path.join(tempdir.name, "test.data"),] retcode = zfec.cmdline_zfec.main() @@ -206,7 +261,7 @@ class Cmdline(unittest.TestCase): RE=re.compile(zfec.filefec.RE_FORMAT % ('test.data', ".fec",)) fns = os.listdir(tempdir.name) - assert len(fns) >= DEFAULT_M, (fns, tempdir, tempdir.name,) + assert len(fns) >= DEFAULT_M, (fns, DEFAULT_M, tempdir, tempdir.name,) sharefns = [ os.path.join(tempdir.name, fn) for fn in fns if RE.match(fn) ] random.shuffle(sharefns) del sharefns[DEFAULT_K:] @@ -222,6 +277,8 @@ class Cmdline(unittest.TestCase): finally: sys.argv = realargv +if __name__ == "__main__": + unittest.main() # zfec -- fast forward error correction library with Python interface #