]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - zfec/zfec/test/test_zfec.py
stick a .gitignore file
[tahoe-lafs/zfec.git] / zfec / zfec / test / test_zfec.py
1 #!/usr/bin/env python
2
3 import cStringIO, os, random, re
4
5 import unittest
6
7 global VERBOSE
8 VERBOSE=False
9
10 import zfec
11
12 from pyutil import fileutil
13
14 from base64 import b32encode
15 def ab(x): # debuggery
16     if len(x) >= 3:
17         return "%s:%s" % (len(x), b32encode(x[-3:]),)
18     elif len(x) == 2:
19         return "%s:%s" % (len(x), b32encode(x[-2:]),)
20     elif len(x) == 1:
21         return "%s:%s" % (len(x), b32encode(x[-1:]),)
22     elif len(x) == 0:
23         return "%s:%s" % (len(x), "--empty--",)
24
25 def randstr(n):
26     return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
27
28 def _h(k, m, ss):
29     encer = zfec.Encoder(k, m)
30     nums_and_blocks = list(enumerate(encer.encode(ss)))
31     assert isinstance(nums_and_blocks, list), nums_and_blocks
32     assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
33     nums_and_blocks = random.sample(nums_and_blocks, k)
34     blocks = [ x[1] for x in nums_and_blocks ]
35     nums = [ x[0] for x in nums_and_blocks ]
36     decer = zfec.Decoder(k, m)
37     decoded = decer.decode(blocks, nums)
38     assert len(decoded) == len(ss), (len(decoded), len(ss),)
39     assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
40
41 def _help_test_random():
42     m = random.randrange(1, 257)
43     k = random.randrange(1, m+1)
44     l = random.randrange(0, 2**9)
45     ss = [ randstr(l/k) for x in range(k) ]
46     _h(k, m, ss)
47
48 def _help_test_random_with_l(l):
49     m = random.randrange(1, 257)
50     k = random.randrange(1, m+1)
51     ss = [ randstr(l/k) for x in range(k) ]
52     _h(k, m, ss)
53
54 def _h_easy(k, m, s):
55     encer = zfec.easyfec.Encoder(k, m)
56     nums_and_blocks = list(enumerate(encer.encode(s)))
57     assert isinstance(nums_and_blocks, list), nums_and_blocks
58     assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
59     nums_and_blocks = random.sample(nums_and_blocks, k)
60     blocks = [ x[1] for x in nums_and_blocks ]
61     nums = [ x[0] for x in nums_and_blocks ]
62     decer = zfec.easyfec.Decoder(k, m)
63
64     decodeds = decer.decode(blocks, nums, padlen=k*len(blocks[0]) - len(s))
65     assert len(decodeds) == len(s), (ab(decodeds), ab(s), k, m)
66     assert decodeds == s, (ab(decodeds), ab(s),)
67
68 def _help_test_random_easy():
69     m = random.randrange(1, 257)
70     k = random.randrange(1, m+1)
71     l = random.randrange(0, 2**9)
72     s = randstr(l)
73     _h_easy(k, m, s)
74
75 def _help_test_random_with_l_easy(l):
76     m = random.randrange(1, 257)
77     k = random.randrange(1, m+1)
78     s = randstr(l)
79     _h_easy(k, m, s)
80
81 class ZFecTest(unittest.TestCase):
82     def test_instantiate_encoder_no_args(self):
83         try:
84             e = zfec.Encoder()
85         except TypeError:
86             # Okay, so that's because we're required to pass constructor args.
87             pass
88         else:
89             # Oops, it should have raised an exception.
90             self.fail("Should have raised exception from incorrect arguments to constructor.")
91
92     def test_instantiate_decoder_no_args(self):
93         try:
94             e = zfec.Decoder()
95         except TypeError:
96             # Okay, so that's because we're required to pass constructor args.
97             pass
98         else:
99             # Oops, it should have raised an exception.
100             self.fail("Should have raised exception from incorrect arguments to constructor.")
101
102     def test_from_agl_c(self):
103         self.failUnless(zfec._fec.test_from_agl())
104             
105     def test_from_agl_py(self):
106         e = zfec.Encoder(3, 5)
107         b0 = '\x01'*8 ; b1 = '\x02'*8 ; b2 = '\x03'*8
108         # print "_from_py before encoding:"
109         # print "b0: %s, b1: %s, b2: %s" % tuple(base64.b16encode(x) for x in [b0, b1, b2])
110
111         b3, b4 = e.encode([b0, b1, b2], (3, 4))
112         # print "after encoding:"
113         # print "b3: %s, b4: %s" % tuple(base64.b16encode(x) for x in [b3, b4])
114
115         d = zfec.Decoder(3, 5)
116         r0, r1, r2 = d.decode((b2, b3, b4), (1, 2, 3))
117
118         # print "after decoding:"
119         # print "b0: %s, b1: %s" % tuple(base64.b16encode(x) for x in [b0, b1])
120
121     def test_small(self):
122         for i in range(16):
123             _help_test_random_with_l(i)
124         if VERBOSE:
125             print "%d randomized tests pass." % (i+1)
126
127     def test_random(self):
128         for i in range(3):
129             _help_test_random()
130         if VERBOSE:
131             print "%d randomized tests pass." % (i+1)
132
133     def test_bad_args_construct_decoder(self):
134         try:
135             zfec.Decoder(-1, -1)
136         except zfec.Error, e:
137             assert "argument is required to be greater than or equal to 1" in str(e), e
138         else:
139             self.fail("Should have gotten an exception from out-of-range arguments.")
140
141         try:
142             zfec.Decoder(1, 257)
143         except zfec.Error, e:
144             assert "argument is required to be less than or equal to 256" in str(e), e
145         else:
146             self.fail("Should have gotten an exception from out-of-range arguments.")
147
148         try:
149             zfec.Decoder(3, 2)
150         except zfec.Error, e:
151             assert "first argument is required to be less than or equal to the second argument" in str(e), e
152         else:
153             self.fail("Should have gotten an exception from out-of-range arguments.")
154
155     def test_bad_args_construct_encoder(self):
156         try:
157             zfec.Encoder(-1, -1)
158         except zfec.Error, e:
159             assert "argument is required to be greater than or equal to 1" in str(e), e
160         else:
161             self.fail("Should have gotten an exception from out-of-range arguments.")
162
163         try:
164             zfec.Encoder(1, 257)
165         except zfec.Error, e:
166             assert "argument is required to be less than or equal to 256" in str(e), e
167         else:
168             self.fail("Should have gotten an exception from out-of-range arguments.")
169
170     def test_bad_args_dec(self):
171         decer = zfec.Decoder(2, 4)
172
173         try:
174             decer.decode(98, []) # first argument is not a sequence
175         except TypeError, e:
176             assert "First argument was not a sequence" in str(e), e
177         else:
178             self.fail("Should have gotten TypeError for wrong type of second argument.")
179
180         try:
181             decer.decode(["a", "b", ], ["c", "d",])
182         except zfec.Error, e:
183             assert "Precondition violation: second argument is required to contain int" in str(e), e
184         else:
185             self.fail("Should have gotten zfec.Error for wrong type of second argument.")
186
187         try:
188             decer.decode(["a", "b", ], 98) # not a sequence at all
189         except TypeError, e:
190             assert "Second argument was not a sequence" in str(e), e
191         else:
192             self.fail("Should have gotten TypeError for wrong type of second argument.")
193
194 class EasyFecTest(unittest.TestCase):
195     def test_small(self):
196         for i in range(16):
197             _help_test_random_with_l_easy(i)
198         if VERBOSE:
199             print "%d randomized tests pass." % (i+1)
200
201     def test_random(self):
202         for i in range(3):
203             _help_test_random_easy()
204         if VERBOSE:
205             print "%d randomized tests pass." % (i+1)
206
207     def test_bad_args_dec(self):
208         decer = zfec.easyfec.Decoder(2, 4)
209
210         try:
211             decer.decode(98, [0, 1], 0) # first argument is not a sequence
212         except TypeError, e:
213             assert "First argument was not a sequence" in str(e), e
214         else:
215             self.fail("Should have gotten TypeError for wrong type of second argument.")
216
217         try:
218             decer.decode("ab", ["c", "d",], 0)
219         except zfec.Error, e:
220             assert "Precondition violation: second argument is required to contain int" in str(e), e
221         else:
222             self.fail("Should have gotten zfec.Error for wrong type of second argument.")
223
224         try:
225             decer.decode("ab", 98, 0) # not a sequence at all
226         except TypeError, e:
227             assert "Second argument was not a sequence" in str(e), e
228         else:
229             self.fail("Should have gotten TypeError for wrong type of second argument.")
230
231 class FileFec(unittest.TestCase):
232     def test_filefec_header(self):
233         for m in [1, 2, 3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
234             for k in [1, 2, 3, 5, 9, 17, 33, 65, 129, 255, 256,]:
235                 if k >= m:
236                     continue
237                 for pad in [0, 1, k-1,]:
238                     if pad >= k:
239                         continue
240                     for sh in [0, 1, m-1,]:
241                         if sh >= m:
242                             continue
243                         h = zfec.filefec._build_header(m, k, pad, sh)
244                         hio = cStringIO.StringIO(h)
245                         (rm, rk, rpad, rsh,) = zfec.filefec._parse_header(hio)
246                         assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
247
248     def _help_test_filefec(self, teststr, k, m, numshs=None):
249         if numshs == None:
250             numshs = m
251
252         TESTFNAME = "testfile.txt"
253         PREFIX = "test"
254         SUFFIX = ".fec"
255
256         fsize = len(teststr)
257
258         tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
259         try:
260             tempf = tempdir.file(TESTFNAME, 'w+b')
261             tempf.write(teststr)
262             tempf.flush()
263             tempf.seek(0)
264
265             # encode the file
266             zfec.filefec.encode_to_files(tempf, fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
267
268             # select some share files
269             RE=re.compile(zfec.filefec.RE_FORMAT % (PREFIX, SUFFIX,))
270             fns = os.listdir(tempdir.name)
271             assert len(fns) >= m, (fns, tempdir, tempdir.name,)
272             sharefs = [ open(os.path.join(tempdir.name, fn), "rb") for fn in fns if RE.match(fn) ]
273             for sharef in sharefs:
274                 tempdir.register_file(sharef)
275             random.shuffle(sharefs)
276             del sharefs[numshs:]
277
278             # decode from the share files
279             outf = tempdir.file('recovered-testfile.txt', 'w+b')
280             zfec.filefec.decode_from_files(outf, sharefs, verbose=VERBOSE)
281             outf.flush()
282             outf.seek(0)
283             recovereddata = outf.read()
284             assert recovereddata == teststr, (ab(recovereddata), ab(teststr),)
285         finally:
286             tempdir.shutdown()
287
288     def test_filefec_all_shares(self):
289         return self._help_test_filefec("Yellow Whirled!", 3, 8)
290
291     def test_filefec_all_shares_1_b(self):
292         return self._help_test_filefec("Yellow Whirled!", 4, 16)
293
294     def test_filefec_all_shares_2(self):
295         return self._help_test_filefec("Yellow Whirled", 3, 8)
296
297     def test_filefec_all_shares_2_b(self):
298         return self._help_test_filefec("Yellow Whirled", 4, 16)
299
300     def test_filefec_all_shares_3(self):
301         return self._help_test_filefec("Yellow Whirle", 3, 8)
302
303     def test_filefec_all_shares_3_b(self):
304         return self._help_test_filefec("Yellow Whirle", 4, 16)
305
306     def test_filefec_all_shares_with_padding(self, noisy=VERBOSE):
307         return self._help_test_filefec("Yellow Whirled!A", 3, 8)
308
309     def test_filefec_min_shares_with_padding(self, noisy=VERBOSE):
310         return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3)
311
312     def test_filefec_min_shares_with_crlf(self, noisy=VERBOSE):
313         return self._help_test_filefec("llow Whirled!A\r\n", 3, 8, numshs=3)
314
315     def test_filefec_min_shares_with_lf(self, noisy=VERBOSE):
316         return self._help_test_filefec("Yellow Whirled!A\n", 3, 8, numshs=3)
317
318     def test_filefec_min_shares_with_lflf(self, noisy=VERBOSE):
319         return self._help_test_filefec("Yellow Whirled!A\n\n", 3, 8, numshs=3)
320
321     def test_filefec_min_shares_with_crcrlflf(self, noisy=VERBOSE):
322         return self._help_test_filefec("Yellow Whirled!A\r\r\n\n", 3, 8, numshs=3)
323
324
325 class Cmdline(unittest.TestCase):
326     def test_basic(self, noisy=VERBOSE):
327         tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
328         fo = tempdir.file("test.data", "w+b")
329         fo.write("WHEHWHJEKWAHDLJAWDHWALKDHA")
330
331         import sys
332         realargv = sys.argv
333         try:
334             DEFAULT_M=8
335             DEFAULT_K=3
336             sys.argv = ["zfec", os.path.join(tempdir.name, "test.data"),]
337
338             retcode = zfec.cmdline_zfec.main()
339             assert retcode == 0, retcode
340
341             RE=re.compile(zfec.filefec.RE_FORMAT % ('test.data', ".fec",))
342             fns = os.listdir(tempdir.name)
343             assert len(fns) >= DEFAULT_M, (fns, DEFAULT_M, tempdir, tempdir.name,)
344             sharefns = [ os.path.join(tempdir.name, fn) for fn in fns if RE.match(fn) ]
345             random.shuffle(sharefns)
346             del sharefns[DEFAULT_K:]
347
348             sys.argv = ["zunfec",]
349             sys.argv.extend(sharefns)
350             sys.argv.extend(['-o', os.path.join(tempdir.name, 'test.data-recovered'),])
351
352             retcode = zfec.cmdline_zunfec.main()
353             assert retcode == 0, retcode
354             import filecmp
355             assert filecmp.cmp(os.path.join(tempdir.name, 'test.data'), os.path.join(tempdir.name, 'test.data-recovered'))
356         finally:
357             sys.argv = realargv
358
359 if __name__ == "__main__":
360     unittest.main()