3 import cStringIO, os, random, re
12 from base64 import b32encode
13 def ab(x): # debuggery
15 return "%s:%s" % (len(x), b32encode(x[-3:]),)
17 return "%s:%s" % (len(x), b32encode(x[-2:]),)
19 return "%s:%s" % (len(x), b32encode(x[-1:]),)
21 return "%s:%s" % (len(x), "--empty--",)
24 encer = zfec.Encoder(k, m)
25 nums_and_blocks = list(enumerate(encer.encode(ss)))
26 assert isinstance(nums_and_blocks, list), nums_and_blocks
27 assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
28 nums_and_blocks = random.sample(nums_and_blocks, k)
29 blocks = [ x[1] for x in nums_and_blocks ]
30 nums = [ x[0] for x in nums_and_blocks ]
31 decer = zfec.Decoder(k, m)
32 decoded = decer.decode(blocks, nums)
33 assert len(decoded) == len(ss), (len(decoded), len(ss),)
34 assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
37 return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
39 def _help_test_random():
40 m = random.randrange(1, 257)
41 k = random.randrange(1, m+1)
42 l = random.randrange(0, 2**10)
43 ss = [ randstr(l/k) for x in range(k) ]
46 def _help_test_random_with_l(l):
49 ss = [ randstr(l/k) for x in range(k) ]
52 class ZFec(unittest.TestCase):
53 def test_random(self):
57 print "%d randomized tests pass." % (i+1)
59 def test_bad_args_enc(self):
60 encer = zfec.Encoder(2, 4)
62 encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",])
64 assert "Precondition violation: second argument is required to contain int" in str(e), e
66 raise "Should have gotten zfec.Error for wrong type of second argument."
69 encer.encode(["a", "b", ], 98) # not a sequence at all
71 assert "Second argument (optional) was not a sequence" in str(e), e
73 raise "Should have gotten TypeError for wrong type of second argument."
75 def test_bad_args_dec(self):
76 decer = zfec.Decoder(2, 4)
79 decer.decode(98, [0, 1]) # first argument is not a sequence
81 assert "First argument was not a sequence" in str(e), e
83 raise "Should have gotten TypeError for wrong type of second argument."
86 decer.decode(["a", "b", ], ["c", "d",])
88 assert "Precondition violation: second argument is required to contain int" in str(e), e
90 raise "Should have gotten zfec.Error for wrong type of second argument."
93 decer.decode(["a", "b", ], 98) # not a sequence at all
95 assert "Second argument was not a sequence" in str(e), e
97 raise "Should have gotten TypeError for wrong type of second argument."
99 class FileFec(unittest.TestCase):
100 def test_filefec_header(self):
101 for m in [3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
102 for k in [2, 3, 5, 9, 17, 33, 65, 129, 255,]:
105 for pad in [0, 1, k-1,]:
108 for sh in [0, 1, m-1,]:
111 h = zfec.filefec._build_header(m, k, pad, sh)
112 hio = cStringIO.StringIO(h)
113 (rm, rk, rpad, rsh,) = zfec.filefec._parse_header(hio)
114 assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
116 def _help_test_filefec(self, teststr, k, m, numshs=None):
120 TESTFNAME = "testfile.txt"
126 tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=True)
128 tempf = tempdir.file(TESTFNAME, 'w+b')
133 zfec.filefec.encode_to_files(tempf, fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
135 # select some share files
136 RE=re.compile(zfec.filefec.RE_FORMAT % (PREFIX, SUFFIX,))
137 fns = os.listdir(tempdir.name)
138 assert len(fns) >= m, (fns, tempdir, tempdir.name,)
139 sharefs = [ open(os.path.join(tempdir.name, fn), "rb") for fn in fns if RE.match(fn) ]
140 for sharef in sharefs:
141 tempdir.register_file(sharef)
142 random.shuffle(sharefs)
145 # decode from the share files
146 outf = tempdir.file('recovered-testfile.txt', 'w+b')
147 zfec.filefec.decode_from_files(outf, sharefs, verbose=VERBOSE)
149 recovereddata = outf.read()
150 assert recovereddata == teststr
154 def test_filefec_all_shares(self):
155 return self._help_test_filefec("Yellow Whirled!", 3, 8)
157 def test_filefec_all_shares_1_b(self):
158 return self._help_test_filefec("Yellow Whirled!", 4, 16)
160 def test_filefec_all_shares_2(self):
161 return self._help_test_filefec("Yellow Whirled", 3, 8)
163 def test_filefec_all_shares_2_b(self):
164 return self._help_test_filefec("Yellow Whirled", 4, 16)
166 def test_filefec_all_shares_3(self):
167 return self._help_test_filefec("Yellow Whirle", 3, 8)
169 def test_filefec_all_shares_3_b(self):
170 return self._help_test_filefec("Yellow Whirle", 4, 16)
172 def test_filefec_all_shares_with_padding(self, noisy=VERBOSE):
173 return self._help_test_filefec("Yellow Whirled!A", 3, 8)
175 def test_filefec_min_shares_with_padding(self, noisy=VERBOSE):
176 return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3)
178 def test_filefec_min_shares_with_crlf(self, noisy=VERBOSE):
179 return self._help_test_filefec("llow Whirled!A\r\n", 3, 8, numshs=3)
181 def test_filefec_min_shares_with_lf(self, noisy=VERBOSE):
182 return self._help_test_filefec("Yellow Whirled!A\n", 3, 8, numshs=3)
184 def test_filefec_min_shares_with_lflf(self, noisy=VERBOSE):
185 return self._help_test_filefec("Yellow Whirled!A\n\n", 3, 8, numshs=3)
187 def test_filefec_min_shares_with_crcrlflf(self, noisy=VERBOSE):
188 return self._help_test_filefec("Yellow Whirled!A\r\r\n\n", 3, 8, numshs=3)
191 class Cmdline(unittest.TestCase):
192 def test_basic(self, noisy=VERBOSE):
193 tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=True)
194 fo = tempdir.file("test.data", "w+b")
195 fo.write("WHEHWHJEKWAHDLJAWDHWALKDHA")
202 sys.argv = ["zfec", os.path.join(tempdir.name, "test.data"),]
204 retcode = zfec.cmdline_zfec.main()
205 assert retcode == 0, retcode
207 RE=re.compile(zfec.filefec.RE_FORMAT % ('test.data', ".fec",))
208 fns = os.listdir(tempdir.name)
209 assert len(fns) >= DEFAULT_M, (fns, tempdir, tempdir.name,)
210 sharefns = [ os.path.join(tempdir.name, fn) for fn in fns if RE.match(fn) ]
211 random.shuffle(sharefns)
212 del sharefns[DEFAULT_K:]
214 sys.argv = ["zunfec",]
215 sys.argv.extend(sharefns)
216 sys.argv.extend(['-o', os.path.join(tempdir.name, 'test.data-recovered'),])
218 retcode = zfec.cmdline_zunfec.main()
219 assert retcode == 0, retcode
221 assert filecmp.cmp(os.path.join(tempdir.name, 'test.data'), os.path.join(tempdir.name, 'test.data-recovered'))
226 # zfec -- fast forward error correction library with Python interface
228 # Copyright (C) 2007 Allmydata, Inc.
229 # Author: Zooko Wilcox-O'Hearn
231 # This file is part of zfec.
233 # This program is free software; you can redistribute it and/or modify it
234 # under the terms of the GNU General Public License as published by the Free
235 # Software Foundation; either version 2 of the License, or (at your option)
236 # any later version, with the added permission that, if you become obligated
237 # to release a derived work under this licence (as per section 2.b), you may
238 # delay the fulfillment of this obligation for up to 12 months. See the file
239 # COPYING for details.
241 # If you would like to inquire about a commercial relationship with Allmydata,
242 # Inc., please contact partnerships@allmydata.com and visit
243 # http://allmydata.com/.