import zfec
+from pyutil import fileutil
+
from base64 import b32encode
def ab(x): # debuggery
if len(x) >= 3:
elif len(x) == 0:
return "%s:%s" % (len(x), "--empty--",)
+def randstr(n):
+ return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
+
def _h(k, m, ss):
encer = zfec.Encoder(k, m)
nums_and_blocks = list(enumerate(encer.encode(ss)))
assert len(decoded) == len(ss), (len(decoded), len(ss),)
assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
-def randstr(n):
- return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
-
def _help_test_random():
m = random.randrange(1, 257)
k = random.randrange(1, m+1)
_h(k, m, ss)
def _help_test_random_with_l(l):
- m = 83
- k = 19
+ m = random.randrange(1, 257)
+ k = random.randrange(1, m+1)
ss = [ randstr(l/k) for x in range(k) ]
_h(k, m, ss)
-class ZFec(unittest.TestCase):
+def _h_easy(k, m, s):
+ encer = zfec.easyfec.Encoder(k, m)
+ nums_and_blocks = list(enumerate(encer.encode(s)))
+ assert isinstance(nums_and_blocks, list), nums_and_blocks
+ assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
+ nums_and_blocks = random.sample(nums_and_blocks, k)
+ blocks = [ x[1] for x in nums_and_blocks ]
+ nums = [ x[0] for x in nums_and_blocks ]
+ decer = zfec.easyfec.Decoder(k, m)
+ decodeds = decer.decode(blocks, nums)
+ assert len(decodeds) == len(s), (len(decodeds), len(s),)
+ assert decodeds == s, (decodeds, s,)
+
+def _help_test_random_easy():
+ m = random.randrange(1, 257)
+ k = random.randrange(1, m+1)
+ l = random.randrange(0, 2**10)
+ s = randstr(l)
+ _h_easy(k, m, s)
+
+def _help_test_random_with_l_easy(l):
+ m = random.randrange(1, 257)
+ k = random.randrange(1, m+1)
+ s = randstr(l)
+ _h_easy(k, m, s)
+
+class ZFecTest(unittest.TestCase):
+ def test_small(self):
+ for i in range(16):
+ _help_test_random_with_l(i)
+ if VERBOSE:
+ print "%d randomized tests pass." % (i+1)
+
def test_random(self):
for i in range(3):
_help_test_random()
if VERBOSE:
print "%d randomized tests pass." % (i+1)
- def test_bad_args_enc(self):
- encer = zfec.Encoder(2, 4)
+ def test_bad_args_dec(self):
+ decer = zfec.Decoder(2, 4)
+
+ try:
+ decer.decode(98) # first argument is not a sequence
+ except TypeError, e:
+ assert "First argument was not a sequence" in str(e), e
+ else:
+ raise "Should have gotten TypeError for wrong type of second argument."
+
try:
- encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",])
+ decer.decode(["a", "b", ], ["c", "d",])
except zfec.Error, e:
assert "Precondition violation: second argument is required to contain int" in str(e), e
else:
raise "Should have gotten zfec.Error for wrong type of second argument."
try:
- encer.encode(["a", "b", ], 98) # not a sequence at all
+ decer.decode(["a", "b", ], 98) # not a sequence at all
except TypeError, e:
- assert "Second argument (optional) was not a sequence" in str(e), e
+ assert "Second argument was not a sequence" in str(e), e
else:
raise "Should have gotten TypeError for wrong type of second argument."
+class EasyFecTest(unittest.TestCase):
+ def test_small(self):
+ for i in range(2**10):
+ _help_test_random_with_l_easy(i)
+ if VERBOSE:
+ print "%d randomized tests pass." % (i+1)
+
+ def test_random(self):
+ for i in range(2**8):
+ _help_test_random_easy()
+ if VERBOSE:
+ print "%d randomized tests pass." % (i+1)
+
def test_bad_args_dec(self):
- decer = zfec.Decoder(2, 4)
+ decer = zfec.easyfec.Decoder(2, 4)
try:
decer.decode(98, [0, 1]) # first argument is not a sequence
raise "Should have gotten TypeError for wrong type of second argument."
try:
- decer.decode(["a", "b", ], ["c", "d",])
+ decer.decode("ab", ["c", "d",])
except zfec.Error, e:
assert "Precondition violation: second argument is required to contain int" in str(e), e
else:
raise "Should have gotten zfec.Error for wrong type of second argument."
try:
- decer.decode(["a", "b", ], 98) # not a sequence at all
+ decer.decode("ab", 98) # not a sequence at all
except TypeError, e:
assert "Second argument was not a sequence" in str(e), e
else:
fsize = len(teststr)
- tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=True)
+ tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
try:
tempf = tempdir.file(TESTFNAME, 'w+b')
tempf.write(teststr)
class Cmdline(unittest.TestCase):
def test_basic(self, noisy=VERBOSE):
- tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=True)
+ tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
fo = tempdir.file("test.data", "w+b")
fo.write("WHEHWHJEKWAHDLJAWDHWALKDHA")
import sys
realargv = sys.argv
try:
- DEFAULT_M=16
- DEFAULT_K=4
+ DEFAULT_M=8
+ DEFAULT_K=3
sys.argv = ["zfec", os.path.join(tempdir.name, "test.data"),]
retcode = zfec.cmdline_zfec.main()
RE=re.compile(zfec.filefec.RE_FORMAT % ('test.data', ".fec",))
fns = os.listdir(tempdir.name)
- assert len(fns) >= DEFAULT_M, (fns, tempdir, tempdir.name,)
+ assert len(fns) >= DEFAULT_M, (fns, DEFAULT_M, tempdir, tempdir.name,)
sharefns = [ os.path.join(tempdir.name, fn) for fn in fns if RE.match(fn) ]
random.shuffle(sharefns)
del sharefns[DEFAULT_K:]
finally:
sys.argv = realargv
+if __name__ == "__main__":
+ unittest.main()
# zfec -- fast forward error correction library with Python interface
#