]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/commitdiff
zfec: add unit tests for easyfec
authorzooko <zooko@zooko.com>
Thu, 8 Nov 2007 22:26:00 +0000 (03:56 +0530)
committerzooko <zooko@zooko.com>
Thu, 8 Nov 2007 22:26:00 +0000 (03:56 +0530)
darcs-hash:a55ddf8d4f0fd974bd80f5744377a5217bf94b1f

zfec/zfec/test/test_zfec.py

index 150b7d995600b5c5b4d1aa8bd25a881f25f49523..56ca2760f6869058e112ce1fd2c8afe294f783dd 100755 (executable)
@@ -9,6 +9,8 @@ VERBOSE=False
 
 import zfec
 
+from pyutil import fileutil
+
 from base64 import b32encode
 def ab(x): # debuggery
     if len(x) >= 3:
@@ -20,6 +22,9 @@ def ab(x): # debuggery
     elif len(x) == 0:
         return "%s:%s" % (len(x), "--empty--",)
 
+def randstr(n):
+    return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
+
 def _h(k, m, ss):
     encer = zfec.Encoder(k, m)
     nums_and_blocks = list(enumerate(encer.encode(ss)))
@@ -33,9 +38,6 @@ def _h(k, m, ss):
     assert len(decoded) == len(ss), (len(decoded), len(ss),)
     assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
 
-def randstr(n):
-    return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
-
 def _help_test_random():
     m = random.randrange(1, 257)
     k = random.randrange(1, m+1)
@@ -44,36 +46,89 @@ def _help_test_random():
     _h(k, m, ss)
 
 def _help_test_random_with_l(l):
-    m = 83
-    k = 19
+    m = random.randrange(1, 257)
+    k = random.randrange(1, m+1)
     ss = [ randstr(l/k) for x in range(k) ]
     _h(k, m, ss)
 
-class ZFec(unittest.TestCase):
+def _h_easy(k, m, s):
+    encer = zfec.easyfec.Encoder(k, m)
+    nums_and_blocks = list(enumerate(encer.encode(s)))
+    assert isinstance(nums_and_blocks, list), nums_and_blocks
+    assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
+    nums_and_blocks = random.sample(nums_and_blocks, k)
+    blocks = [ x[1] for x in nums_and_blocks ]
+    nums = [ x[0] for x in nums_and_blocks ]
+    decer = zfec.easyfec.Decoder(k, m)
+    decodeds = decer.decode(blocks, nums)
+    assert len(decodeds) == len(s), (len(decodeds), len(s),)
+    assert decodeds == s, (decodeds, s,)
+
+def _help_test_random_easy():
+    m = random.randrange(1, 257)
+    k = random.randrange(1, m+1)
+    l = random.randrange(0, 2**10)
+    s = randstr(l)
+    _h_easy(k, m, s)
+
+def _help_test_random_with_l_easy(l):
+    m = random.randrange(1, 257)
+    k = random.randrange(1, m+1)
+    s = randstr(l)
+    _h_easy(k, m, s)
+    
+class ZFecTest(unittest.TestCase):
+    def test_small(self):
+        for i in range(16):
+            _help_test_random_with_l(i)
+        if VERBOSE:
+            print "%d randomized tests pass." % (i+1)
+
     def test_random(self):
         for i in range(3):
             _help_test_random()
         if VERBOSE:
             print "%d randomized tests pass." % (i+1)
 
-    def test_bad_args_enc(self):
-        encer = zfec.Encoder(2, 4)
+    def test_bad_args_dec(self):
+        decer = zfec.Decoder(2, 4)
+
+        try:
+            decer.decode(98) # first argument is not a sequence
+        except TypeError, e:
+            assert "First argument was not a sequence" in str(e), e
+        else:
+            raise "Should have gotten TypeError for wrong type of second argument."
+
         try:
-            encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",])
+            decer.decode(["a", "b", ], ["c", "d",])
         except zfec.Error, e:
             assert "Precondition violation: second argument is required to contain int" in str(e), e
         else:
             raise "Should have gotten zfec.Error for wrong type of second argument."
 
         try:
-            encer.encode(["a", "b", ], 98) # not a sequence at all
+            decer.decode(["a", "b", ], 98) # not a sequence at all
         except TypeError, e:
-            assert "Second argument (optional) was not a sequence" in str(e), e
+            assert "Second argument was not a sequence" in str(e), e
         else:
             raise "Should have gotten TypeError for wrong type of second argument."
 
+class EasyFecTest(unittest.TestCase):
+    def test_small(self):
+        for i in range(2**10):
+            _help_test_random_with_l_easy(i)
+        if VERBOSE:
+            print "%d randomized tests pass." % (i+1)
+
+    def test_random(self):
+        for i in range(2**8):
+            _help_test_random_easy()
+        if VERBOSE:
+            print "%d randomized tests pass." % (i+1)
+
     def test_bad_args_dec(self):
-        decer = zfec.Decoder(2, 4)
+        decer = zfec.easyfec.Decoder(2, 4)
 
         try:
             decer.decode(98, [0, 1]) # first argument is not a sequence
@@ -83,14 +138,14 @@ class ZFec(unittest.TestCase):
             raise "Should have gotten TypeError for wrong type of second argument."
 
         try:
-            decer.decode(["a", "b", ], ["c", "d",])
+            decer.decode("ab", ["c", "d",])
         except zfec.Error, e:
             assert "Precondition violation: second argument is required to contain int" in str(e), e
         else:
             raise "Should have gotten zfec.Error for wrong type of second argument."
 
         try:
-            decer.decode(["a", "b", ], 98) # not a sequence at all
+            decer.decode("ab", 98) # not a sequence at all
         except TypeError, e:
             assert "Second argument was not a sequence" in str(e), e
         else:
@@ -123,7 +178,7 @@ class FileFec(unittest.TestCase):
 
         fsize = len(teststr)
 
-        tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=True)
+        tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
         try:
             tempf = tempdir.file(TESTFNAME, 'w+b')
             tempf.write(teststr)
@@ -190,15 +245,15 @@ class FileFec(unittest.TestCase):
  
 class Cmdline(unittest.TestCase):
     def test_basic(self, noisy=VERBOSE):
-        tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=True)
+        tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
         fo = tempdir.file("test.data", "w+b")
         fo.write("WHEHWHJEKWAHDLJAWDHWALKDHA")
 
         import sys
         realargv = sys.argv
         try:
-            DEFAULT_M=16
-            DEFAULT_K=4
+            DEFAULT_M=8
+            DEFAULT_K=3
             sys.argv = ["zfec", os.path.join(tempdir.name, "test.data"),]
         
             retcode = zfec.cmdline_zfec.main()
@@ -206,7 +261,7 @@ class Cmdline(unittest.TestCase):
 
             RE=re.compile(zfec.filefec.RE_FORMAT % ('test.data', ".fec",))
             fns = os.listdir(tempdir.name)
-            assert len(fns) >= DEFAULT_M, (fns, tempdir, tempdir.name,)
+            assert len(fns) >= DEFAULT_M, (fns, DEFAULT_M, tempdir, tempdir.name,)
             sharefns = [ os.path.join(tempdir.name, fn) for fn in fns if RE.match(fn) ]
             random.shuffle(sharefns)
             del sharefns[DEFAULT_K:]
@@ -222,6 +277,8 @@ class Cmdline(unittest.TestCase):
         finally:
             sys.argv = realargv
 
+if __name__ == "__main__":
+    unittest.main()
 
 # zfec -- fast forward error correction library with Python interface
 #