]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blobdiff - zfec/zfec/test/test_zfec.py
stick a .gitignore file
[tahoe-lafs/zfec.git] / zfec / zfec / test / test_zfec.py
index f4870eb84e0888ac8508b82c6abcca362cf5d06c..46bbb1cb6bbdeaf8c12a2982a5c326ceafcf02f4 100755 (executable)
@@ -1,23 +1,15 @@
 #!/usr/bin/env python
 
-# import bindann
-# import bindann.monkeypatch.all
+import cStringIO, os, random, re
 
-import cStringIO, os, random, re, sys
-
-import zfec
-
-try:
-    from twisted.trial import unittest
-except ImportError:
-    # trial is unavailable, oh well
-    import unittest
+import unittest
 
 global VERBOSE
 VERBOSE=False
-if '-v' in sys.argv:
-    sys.argv.pop(sys.argv.index('-v'))
-    VERBOSE=True
+
+import zfec
+
+from pyutil import fileutil
 
 from base64 import b32encode
 def ab(x): # debuggery
@@ -30,6 +22,9 @@ def ab(x): # debuggery
     elif len(x) == 0:
         return "%s:%s" % (len(x), "--empty--",)
 
+def randstr(n):
+    return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
+
 def _h(k, m, ss):
     encer = zfec.Encoder(k, m)
     nums_and_blocks = list(enumerate(encer.encode(ss)))
@@ -43,73 +38,200 @@ def _h(k, m, ss):
     assert len(decoded) == len(ss), (len(decoded), len(ss),)
     assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
 
-def randstr(n):
-    return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
-
 def _help_test_random():
     m = random.randrange(1, 257)
     k = random.randrange(1, m+1)
-    l = random.randrange(0, 2**10)
+    l = random.randrange(0, 2**9)
     ss = [ randstr(l/k) for x in range(k) ]
     _h(k, m, ss)
 
 def _help_test_random_with_l(l):
-    m = 83
-    k = 19
+    m = random.randrange(1, 257)
+    k = random.randrange(1, m+1)
     ss = [ randstr(l/k) for x in range(k) ]
     _h(k, m, ss)
 
-class ZFec(unittest.TestCase):
+def _h_easy(k, m, s):
+    encer = zfec.easyfec.Encoder(k, m)
+    nums_and_blocks = list(enumerate(encer.encode(s)))
+    assert isinstance(nums_and_blocks, list), nums_and_blocks
+    assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
+    nums_and_blocks = random.sample(nums_and_blocks, k)
+    blocks = [ x[1] for x in nums_and_blocks ]
+    nums = [ x[0] for x in nums_and_blocks ]
+    decer = zfec.easyfec.Decoder(k, m)
+
+    decodeds = decer.decode(blocks, nums, padlen=k*len(blocks[0]) - len(s))
+    assert len(decodeds) == len(s), (ab(decodeds), ab(s), k, m)
+    assert decodeds == s, (ab(decodeds), ab(s),)
+
+def _help_test_random_easy():
+    m = random.randrange(1, 257)
+    k = random.randrange(1, m+1)
+    l = random.randrange(0, 2**9)
+    s = randstr(l)
+    _h_easy(k, m, s)
+
+def _help_test_random_with_l_easy(l):
+    m = random.randrange(1, 257)
+    k = random.randrange(1, m+1)
+    s = randstr(l)
+    _h_easy(k, m, s)
+
+class ZFecTest(unittest.TestCase):
+    def test_instantiate_encoder_no_args(self):
+        try:
+            e = zfec.Encoder()
+        except TypeError:
+            # Okay, so that's because we're required to pass constructor args.
+            pass
+        else:
+            # Oops, it should have raised an exception.
+            self.fail("Should have raised exception from incorrect arguments to constructor.")
+
+    def test_instantiate_decoder_no_args(self):
+        try:
+            e = zfec.Decoder()
+        except TypeError:
+            # Okay, so that's because we're required to pass constructor args.
+            pass
+        else:
+            # Oops, it should have raised an exception.
+            self.fail("Should have raised exception from incorrect arguments to constructor.")
+
+    def test_from_agl_c(self):
+        self.failUnless(zfec._fec.test_from_agl())
+            
+    def test_from_agl_py(self):
+        e = zfec.Encoder(3, 5)
+        b0 = '\x01'*8 ; b1 = '\x02'*8 ; b2 = '\x03'*8
+        # print "_from_py before encoding:"
+        # print "b0: %s, b1: %s, b2: %s" % tuple(base64.b16encode(x) for x in [b0, b1, b2])
+
+        b3, b4 = e.encode([b0, b1, b2], (3, 4))
+        # print "after encoding:"
+        # print "b3: %s, b4: %s" % tuple(base64.b16encode(x) for x in [b3, b4])
+
+        d = zfec.Decoder(3, 5)
+        r0, r1, r2 = d.decode((b2, b3, b4), (1, 2, 3))
+
+        # print "after decoding:"
+        # print "b0: %s, b1: %s" % tuple(base64.b16encode(x) for x in [b0, b1])
+
+    def test_small(self):
+        for i in range(16):
+            _help_test_random_with_l(i)
+        if VERBOSE:
+            print "%d randomized tests pass." % (i+1)
+
     def test_random(self):
         for i in range(3):
             _help_test_random()
         if VERBOSE:
             print "%d randomized tests pass." % (i+1)
 
-    def test_bad_args_enc(self):
-        encer = zfec.Encoder(2, 4)
+    def test_bad_args_construct_decoder(self):
         try:
-            encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",])
+            zfec.Decoder(-1, -1)
         except zfec.Error, e:
-            assert "Precondition violation: second argument is required to contain int" in str(e), e
+            assert "argument is required to be greater than or equal to 1" in str(e), e
         else:
-            raise "Should have gotten zfec.Error for wrong type of second argument."
+            self.fail("Should have gotten an exception from out-of-range arguments.")
 
         try:
-            encer.encode(["a", "b", ], 98) # not a sequence at all
-        except TypeError, e:
-            assert "Second argument (optional) was not a sequence" in str(e), e
+            zfec.Decoder(1, 257)
+        except zfec.Error, e:
+            assert "argument is required to be less than or equal to 256" in str(e), e
+        else:
+            self.fail("Should have gotten an exception from out-of-range arguments.")
+
+        try:
+            zfec.Decoder(3, 2)
+        except zfec.Error, e:
+            assert "first argument is required to be less than or equal to the second argument" in str(e), e
         else:
-            raise "Should have gotten TypeError for wrong type of second argument."
+            self.fail("Should have gotten an exception from out-of-range arguments.")
+
+    def test_bad_args_construct_encoder(self):
+        try:
+            zfec.Encoder(-1, -1)
+        except zfec.Error, e:
+            assert "argument is required to be greater than or equal to 1" in str(e), e
+        else:
+            self.fail("Should have gotten an exception from out-of-range arguments.")
+
+        try:
+            zfec.Encoder(1, 257)
+        except zfec.Error, e:
+            assert "argument is required to be less than or equal to 256" in str(e), e
+        else:
+            self.fail("Should have gotten an exception from out-of-range arguments.")
 
     def test_bad_args_dec(self):
         decer = zfec.Decoder(2, 4)
 
         try:
-            decer.decode(98, [0, 1]) # first argument is not a sequence
+            decer.decode(98, []) # first argument is not a sequence
         except TypeError, e:
             assert "First argument was not a sequence" in str(e), e
         else:
-            raise "Should have gotten TypeError for wrong type of second argument."
+            self.fail("Should have gotten TypeError for wrong type of second argument.")
 
         try:
             decer.decode(["a", "b", ], ["c", "d",])
         except zfec.Error, e:
             assert "Precondition violation: second argument is required to contain int" in str(e), e
         else:
-            raise "Should have gotten zfec.Error for wrong type of second argument."
+            self.fail("Should have gotten zfec.Error for wrong type of second argument.")
 
         try:
             decer.decode(["a", "b", ], 98) # not a sequence at all
         except TypeError, e:
             assert "Second argument was not a sequence" in str(e), e
         else:
-            raise "Should have gotten TypeError for wrong type of second argument."
+            self.fail("Should have gotten TypeError for wrong type of second argument.")
+
+class EasyFecTest(unittest.TestCase):
+    def test_small(self):
+        for i in range(16):
+            _help_test_random_with_l_easy(i)
+        if VERBOSE:
+            print "%d randomized tests pass." % (i+1)
+
+    def test_random(self):
+        for i in range(3):
+            _help_test_random_easy()
+        if VERBOSE:
+            print "%d randomized tests pass." % (i+1)
+
+    def test_bad_args_dec(self):
+        decer = zfec.easyfec.Decoder(2, 4)
+
+        try:
+            decer.decode(98, [0, 1], 0) # first argument is not a sequence
+        except TypeError, e:
+            assert "First argument was not a sequence" in str(e), e
+        else:
+            self.fail("Should have gotten TypeError for wrong type of second argument.")
+
+        try:
+            decer.decode("ab", ["c", "d",], 0)
+        except zfec.Error, e:
+            assert "Precondition violation: second argument is required to contain int" in str(e), e
+        else:
+            self.fail("Should have gotten zfec.Error for wrong type of second argument.")
+
+        try:
+            decer.decode("ab", 98, 0) # not a sequence at all
+        except TypeError, e:
+            assert "Second argument was not a sequence" in str(e), e
+        else:
+            self.fail("Should have gotten TypeError for wrong type of second argument.")
 
 class FileFec(unittest.TestCase):
     def test_filefec_header(self):
-        for m in [3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
-            for k in [2, 3, 5, 9, 17, 33, 65, 129, 255,]:
+        for m in [1, 2, 3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
+            for k in [1, 2, 3, 5, 9, 17, 33, 65, 129, 255, 256,]:
                 if k >= m:
                     continue
                 for pad in [0, 1, k-1,]:
@@ -131,89 +253,108 @@ class FileFec(unittest.TestCase):
         PREFIX = "test"
         SUFFIX = ".fec"
 
-        tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=False)
+        fsize = len(teststr)
+
+        tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
         try:
-            tempfn = os.path.join(tempdir.name, TESTFNAME)
-            tempf = open(tempfn, 'wb')
+            tempf = tempdir.file(TESTFNAME, 'w+b')
             tempf.write(teststr)
-            tempf.close()
-            fsize = os.path.getsize(tempfn)
-            assert fsize == len(teststr)
+            tempf.flush()
+            tempf.seek(0)
 
             # encode the file
-            zfec.filefec.encode_to_files(open(tempfn, 'rb'), fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
+            zfec.filefec.encode_to_files(tempf, fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
 
             # select some share files
             RE=re.compile(zfec.filefec.RE_FORMAT % (PREFIX, SUFFIX,))
             fns = os.listdir(tempdir.name)
+            assert len(fns) >= m, (fns, tempdir, tempdir.name,)
             sharefs = [ open(os.path.join(tempdir.name, fn), "rb") for fn in fns if RE.match(fn) ]
+            for sharef in sharefs:
+                tempdir.register_file(sharef)
             random.shuffle(sharefs)
             del sharefs[numshs:]
 
             # decode from the share files
-            outf = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'wb')
+            outf = tempdir.file('recovered-testfile.txt', 'w+b')
             zfec.filefec.decode_from_files(outf, sharefs, verbose=VERBOSE)
-            outf.close()
-
-            tempfn = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'rb')
-            recovereddata = tempfn.read()
-            assert recovereddata == teststr
+            outf.flush()
+            outf.seek(0)
+            recovereddata = outf.read()
+            assert recovereddata == teststr, (ab(recovereddata), ab(teststr),)
         finally:
             tempdir.shutdown()
 
     def test_filefec_all_shares(self):
         return self._help_test_filefec("Yellow Whirled!", 3, 8)
 
+    def test_filefec_all_shares_1_b(self):
+        return self._help_test_filefec("Yellow Whirled!", 4, 16)
+
+    def test_filefec_all_shares_2(self):
+        return self._help_test_filefec("Yellow Whirled", 3, 8)
+
+    def test_filefec_all_shares_2_b(self):
+        return self._help_test_filefec("Yellow Whirled", 4, 16)
+
+    def test_filefec_all_shares_3(self):
+        return self._help_test_filefec("Yellow Whirle", 3, 8)
+
+    def test_filefec_all_shares_3_b(self):
+        return self._help_test_filefec("Yellow Whirle", 4, 16)
+
     def test_filefec_all_shares_with_padding(self, noisy=VERBOSE):
         return self._help_test_filefec("Yellow Whirled!A", 3, 8)
 
     def test_filefec_min_shares_with_padding(self, noisy=VERBOSE):
         return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3)
 
-if __name__ == "__main__":
-    if hasattr(unittest, 'main'):
-        unittest.main()
-    else:
-        sys.path.append(os.getcwd())
-        mods = []
-        fullname = os.path.realpath(os.path.abspath(__file__))
-        for pathel in sys.path:
-            fullnameofpathel = os.path.realpath(os.path.abspath(pathel))
-            if fullname.startswith(fullnameofpathel):
-                relname = fullname[len(fullnameofpathel):]
-                mod = (os.path.splitext(relname)[0]).replace(os.sep, '.').strip('.')
-                mods.append(mod)
-
-        mods.sort(cmp=lambda x, y: cmp(len(x), len(y)))
-        mods.reverse()
-        for mod in mods:
-            cmdstr = "trial %s %s" % (' '.join(sys.argv[1:]), mod)
-            print cmdstr
-            if os.system(cmdstr) == 0:
-                break
-
-# zfec -- fast forward error correction library with Python interface
-#
-# Copyright (C) 2007 Allmydata, Inc.
-# Author: Zooko Wilcox-O'Hearn
-# mailto:zooko@zooko.com
-#
-# This file is part of zfec.
-#
-# This program is free software; you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free Software
-# Foundation; either version 2 of the License, or (at your option) any later
-# version.  This program also comes with the added permission that, in the case
-# that you are obligated to release a derived work under this licence (as per
-# section 2.b of the GPL), you may delay the fulfillment of this obligation for
-# up to 12 months.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+    def test_filefec_min_shares_with_crlf(self, noisy=VERBOSE):
+        return self._help_test_filefec("llow Whirled!A\r\n", 3, 8, numshs=3)
+
+    def test_filefec_min_shares_with_lf(self, noisy=VERBOSE):
+        return self._help_test_filefec("Yellow Whirled!A\n", 3, 8, numshs=3)
 
+    def test_filefec_min_shares_with_lflf(self, noisy=VERBOSE):
+        return self._help_test_filefec("Yellow Whirled!A\n\n", 3, 8, numshs=3)
+
+    def test_filefec_min_shares_with_crcrlflf(self, noisy=VERBOSE):
+        return self._help_test_filefec("Yellow Whirled!A\r\r\n\n", 3, 8, numshs=3)
+
+
+class Cmdline(unittest.TestCase):
+    def test_basic(self, noisy=VERBOSE):
+        tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
+        fo = tempdir.file("test.data", "w+b")
+        fo.write("WHEHWHJEKWAHDLJAWDHWALKDHA")
+
+        import sys
+        realargv = sys.argv
+        try:
+            DEFAULT_M=8
+            DEFAULT_K=3
+            sys.argv = ["zfec", os.path.join(tempdir.name, "test.data"),]
+
+            retcode = zfec.cmdline_zfec.main()
+            assert retcode == 0, retcode
+
+            RE=re.compile(zfec.filefec.RE_FORMAT % ('test.data', ".fec",))
+            fns = os.listdir(tempdir.name)
+            assert len(fns) >= DEFAULT_M, (fns, DEFAULT_M, tempdir, tempdir.name,)
+            sharefns = [ os.path.join(tempdir.name, fn) for fn in fns if RE.match(fn) ]
+            random.shuffle(sharefns)
+            del sharefns[DEFAULT_K:]
+
+            sys.argv = ["zunfec",]
+            sys.argv.extend(sharefns)
+            sys.argv.extend(['-o', os.path.join(tempdir.name, 'test.data-recovered'),])
+
+            retcode = zfec.cmdline_zunfec.main()
+            assert retcode == 0, retcode
+            import filecmp
+            assert filecmp.cmp(os.path.join(tempdir.name, 'test.data'), os.path.join(tempdir.name, 'test.data-recovered'))
+        finally:
+            sys.argv = realargv
+
+if __name__ == "__main__":
+    unittest.main()