3 import cStringIO, os, random, re
12 from pyutil import fileutil
14 from base64 import b32encode
15 def ab(x): # debuggery
17 return "%s:%s" % (len(x), b32encode(x[-3:]),)
19 return "%s:%s" % (len(x), b32encode(x[-2:]),)
21 return "%s:%s" % (len(x), b32encode(x[-1:]),)
23 return "%s:%s" % (len(x), "--empty--",)
26 return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
29 encer = zfec.Encoder(k, m)
30 nums_and_blocks = list(enumerate(encer.encode(ss)))
31 assert isinstance(nums_and_blocks, list), nums_and_blocks
32 assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
33 nums_and_blocks = random.sample(nums_and_blocks, k)
34 blocks = [ x[1] for x in nums_and_blocks ]
35 nums = [ x[0] for x in nums_and_blocks ]
36 decer = zfec.Decoder(k, m)
37 decoded = decer.decode(blocks, nums)
38 assert len(decoded) == len(ss), (len(decoded), len(ss),)
39 assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
41 def _help_test_random():
42 m = random.randrange(1, 257)
43 k = random.randrange(1, m+1)
44 l = random.randrange(0, 2**9)
45 ss = [ randstr(l/k) for x in range(k) ]
48 def _help_test_random_with_l(l):
49 m = random.randrange(1, 257)
50 k = random.randrange(1, m+1)
51 ss = [ randstr(l/k) for x in range(k) ]
55 encer = zfec.easyfec.Encoder(k, m)
56 nums_and_blocks = list(enumerate(encer.encode(s)))
57 assert isinstance(nums_and_blocks, list), nums_and_blocks
58 assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
59 nums_and_blocks = random.sample(nums_and_blocks, k)
60 blocks = [ x[1] for x in nums_and_blocks ]
61 nums = [ x[0] for x in nums_and_blocks ]
62 decer = zfec.easyfec.Decoder(k, m)
64 decodeds = decer.decode(blocks, nums, padlen=k*len(blocks[0]) - len(s))
65 assert len(decodeds) == len(s), (ab(decodeds), ab(s), k, m)
66 assert decodeds == s, (ab(decodeds), ab(s),)
68 def _help_test_random_easy():
69 m = random.randrange(1, 257)
70 k = random.randrange(1, m+1)
71 l = random.randrange(0, 2**9)
75 def _help_test_random_with_l_easy(l):
76 m = random.randrange(1, 257)
77 k = random.randrange(1, m+1)
81 class ZFecTest(unittest.TestCase):
82 def test_from_agl_c(self):
83 self.failUnless(zfec._fec.test_from_agl())
85 def test_from_agl_py(self):
86 e = zfec.Encoder(3, 5)
87 b0 = '\x01'*8 ; b1 = '\x02'*8 ; b2 = '\x03'*8
88 # print "_from_py before encoding:"
89 # print "b0: %s, b1: %s, b2: %s" % tuple(base64.b16encode(x) for x in [b0, b1, b2])
91 b3, b4 = e.encode([b0, b1, b2], (3, 4))
92 # print "after encoding:"
93 # print "b3: %s, b4: %s" % tuple(base64.b16encode(x) for x in [b3, b4])
95 d = zfec.Decoder(3, 5)
96 r0, r1, r2 = d.decode((b2, b3, b4), (1, 2, 3))
98 # print "after decoding:"
99 # print "b0: %s, b1: %s" % tuple(base64.b16encode(x) for x in [b0, b1])
101 def test_small(self):
103 _help_test_random_with_l(i)
105 print "%d randomized tests pass." % (i+1)
107 def test_random(self):
111 print "%d randomized tests pass." % (i+1)
113 def test_bad_args_construct_decoder(self):
116 except zfec.Error, e:
117 assert "argument is required to be greater than or equal to 1" in str(e), e
119 self.fail("Should have gotten an exception from out-of-range arguments.")
123 except zfec.Error, e:
124 assert "argument is required to be less than or equal to 256" in str(e), e
126 self.fail("Should have gotten an exception from out-of-range arguments.")
130 except zfec.Error, e:
131 assert "first argument is required to be less than or equal to the second argument" in str(e), e
133 self.fail("Should have gotten an exception from out-of-range arguments.")
135 def test_bad_args_construct_encoder(self):
138 except zfec.Error, e:
139 assert "argument is required to be greater than or equal to 1" in str(e), e
141 self.fail("Should have gotten an exception from out-of-range arguments.")
145 except zfec.Error, e:
146 assert "argument is required to be less than or equal to 256" in str(e), e
148 self.fail("Should have gotten an exception from out-of-range arguments.")
150 def test_bad_args_dec(self):
151 decer = zfec.Decoder(2, 4)
154 decer.decode(98, []) # first argument is not a sequence
156 assert "First argument was not a sequence" in str(e), e
158 raise "Should have gotten TypeError for wrong type of second argument."
161 decer.decode(["a", "b", ], ["c", "d",])
162 except zfec.Error, e:
163 assert "Precondition violation: second argument is required to contain int" in str(e), e
165 raise "Should have gotten zfec.Error for wrong type of second argument."
168 decer.decode(["a", "b", ], 98) # not a sequence at all
170 assert "Second argument was not a sequence" in str(e), e
172 raise "Should have gotten TypeError for wrong type of second argument."
174 class EasyFecTest(unittest.TestCase):
175 def test_small(self):
177 _help_test_random_with_l_easy(i)
179 print "%d randomized tests pass." % (i+1)
181 def test_random(self):
183 _help_test_random_easy()
185 print "%d randomized tests pass." % (i+1)
187 def test_bad_args_dec(self):
188 decer = zfec.easyfec.Decoder(2, 4)
191 decer.decode(98, [0, 1], 0) # first argument is not a sequence
193 assert "First argument was not a sequence" in str(e), e
195 raise "Should have gotten TypeError for wrong type of second argument."
198 decer.decode("ab", ["c", "d",], 0)
199 except zfec.Error, e:
200 assert "Precondition violation: second argument is required to contain int" in str(e), e
202 raise "Should have gotten zfec.Error for wrong type of second argument."
205 decer.decode("ab", 98, 0) # not a sequence at all
207 assert "Second argument was not a sequence" in str(e), e
209 raise "Should have gotten TypeError for wrong type of second argument."
211 class FileFec(unittest.TestCase):
212 def test_filefec_header(self):
213 for m in [1, 2, 3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
214 for k in [1, 2, 3, 5, 9, 17, 33, 65, 129, 255, 256,]:
217 for pad in [0, 1, k-1,]:
220 for sh in [0, 1, m-1,]:
223 h = zfec.filefec._build_header(m, k, pad, sh)
224 hio = cStringIO.StringIO(h)
225 (rm, rk, rpad, rsh,) = zfec.filefec._parse_header(hio)
226 assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
228 def _help_test_filefec(self, teststr, k, m, numshs=None):
232 TESTFNAME = "testfile.txt"
238 tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
240 tempf = tempdir.file(TESTFNAME, 'w+b')
246 zfec.filefec.encode_to_files(tempf, fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
248 # select some share files
249 RE=re.compile(zfec.filefec.RE_FORMAT % (PREFIX, SUFFIX,))
250 fns = os.listdir(tempdir.name)
251 assert len(fns) >= m, (fns, tempdir, tempdir.name,)
252 sharefs = [ open(os.path.join(tempdir.name, fn), "rb") for fn in fns if RE.match(fn) ]
253 for sharef in sharefs:
254 tempdir.register_file(sharef)
255 random.shuffle(sharefs)
258 # decode from the share files
259 outf = tempdir.file('recovered-testfile.txt', 'w+b')
260 zfec.filefec.decode_from_files(outf, sharefs, verbose=VERBOSE)
263 recovereddata = outf.read()
264 assert recovereddata == teststr, (ab(recovereddata), ab(teststr),)
268 def test_filefec_all_shares(self):
269 return self._help_test_filefec("Yellow Whirled!", 3, 8)
271 def test_filefec_all_shares_1_b(self):
272 return self._help_test_filefec("Yellow Whirled!", 4, 16)
274 def test_filefec_all_shares_2(self):
275 return self._help_test_filefec("Yellow Whirled", 3, 8)
277 def test_filefec_all_shares_2_b(self):
278 return self._help_test_filefec("Yellow Whirled", 4, 16)
280 def test_filefec_all_shares_3(self):
281 return self._help_test_filefec("Yellow Whirle", 3, 8)
283 def test_filefec_all_shares_3_b(self):
284 return self._help_test_filefec("Yellow Whirle", 4, 16)
286 def test_filefec_all_shares_with_padding(self, noisy=VERBOSE):
287 return self._help_test_filefec("Yellow Whirled!A", 3, 8)
289 def test_filefec_min_shares_with_padding(self, noisy=VERBOSE):
290 return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3)
292 def test_filefec_min_shares_with_crlf(self, noisy=VERBOSE):
293 return self._help_test_filefec("llow Whirled!A\r\n", 3, 8, numshs=3)
295 def test_filefec_min_shares_with_lf(self, noisy=VERBOSE):
296 return self._help_test_filefec("Yellow Whirled!A\n", 3, 8, numshs=3)
298 def test_filefec_min_shares_with_lflf(self, noisy=VERBOSE):
299 return self._help_test_filefec("Yellow Whirled!A\n\n", 3, 8, numshs=3)
301 def test_filefec_min_shares_with_crcrlflf(self, noisy=VERBOSE):
302 return self._help_test_filefec("Yellow Whirled!A\r\r\n\n", 3, 8, numshs=3)
305 class Cmdline(unittest.TestCase):
306 def test_basic(self, noisy=VERBOSE):
307 tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
308 fo = tempdir.file("test.data", "w+b")
309 fo.write("WHEHWHJEKWAHDLJAWDHWALKDHA")
316 sys.argv = ["zfec", os.path.join(tempdir.name, "test.data"),]
318 retcode = zfec.cmdline_zfec.main()
319 assert retcode == 0, retcode
321 RE=re.compile(zfec.filefec.RE_FORMAT % ('test.data', ".fec",))
322 fns = os.listdir(tempdir.name)
323 assert len(fns) >= DEFAULT_M, (fns, DEFAULT_M, tempdir, tempdir.name,)
324 sharefns = [ os.path.join(tempdir.name, fn) for fn in fns if RE.match(fn) ]
325 random.shuffle(sharefns)
326 del sharefns[DEFAULT_K:]
328 sys.argv = ["zunfec",]
329 sys.argv.extend(sharefns)
330 sys.argv.extend(['-o', os.path.join(tempdir.name, 'test.data-recovered'),])
332 retcode = zfec.cmdline_zunfec.main()
333 assert retcode == 0, retcode
335 assert filecmp.cmp(os.path.join(tempdir.name, 'test.data'), os.path.join(tempdir.name, 'test.data-recovered'))
339 if __name__ == "__main__":