]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - zfec/zfec/test/test_zfec.py
zfec: update licensing and attribution docs
[tahoe-lafs/zfec.git] / zfec / zfec / test / test_zfec.py
1 #!/usr/bin/env python
2
3 import cStringIO, os, random, re
4
5 import unittest
6
7 global VERBOSE
8 VERBOSE=False
9
10 import zfec
11
12 from pyutil import fileutil
13
14 from base64 import b32encode
15 def ab(x): # debuggery
16     if len(x) >= 3:
17         return "%s:%s" % (len(x), b32encode(x[-3:]),)
18     elif len(x) == 2:
19         return "%s:%s" % (len(x), b32encode(x[-2:]),)
20     elif len(x) == 1:
21         return "%s:%s" % (len(x), b32encode(x[-1:]),)
22     elif len(x) == 0:
23         return "%s:%s" % (len(x), "--empty--",)
24
25 def randstr(n):
26     return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
27
28 def _h(k, m, ss):
29     encer = zfec.Encoder(k, m)
30     nums_and_blocks = list(enumerate(encer.encode(ss)))
31     assert isinstance(nums_and_blocks, list), nums_and_blocks
32     assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
33     nums_and_blocks = random.sample(nums_and_blocks, k)
34     blocks = [ x[1] for x in nums_and_blocks ]
35     nums = [ x[0] for x in nums_and_blocks ]
36     decer = zfec.Decoder(k, m)
37     decoded = decer.decode(blocks, nums)
38     assert len(decoded) == len(ss), (len(decoded), len(ss),)
39     assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
40
41 def _help_test_random():
42     m = random.randrange(1, 257)
43     k = random.randrange(1, m+1)
44     l = random.randrange(0, 2**9)
45     ss = [ randstr(l/k) for x in range(k) ]
46     _h(k, m, ss)
47
48 def _help_test_random_with_l(l):
49     m = random.randrange(1, 257)
50     k = random.randrange(1, m+1)
51     ss = [ randstr(l/k) for x in range(k) ]
52     _h(k, m, ss)
53
54 def _h_easy(k, m, s):
55     encer = zfec.easyfec.Encoder(k, m)
56     nums_and_blocks = list(enumerate(encer.encode(s)))
57     assert isinstance(nums_and_blocks, list), nums_and_blocks
58     assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
59     nums_and_blocks = random.sample(nums_and_blocks, k)
60     blocks = [ x[1] for x in nums_and_blocks ]
61     nums = [ x[0] for x in nums_and_blocks ]
62     decer = zfec.easyfec.Decoder(k, m)
63     
64     decodeds = decer.decode(blocks, nums, padlen=k*len(blocks[0]) - len(s))
65     assert len(decodeds) == len(s), (ab(decodeds), ab(s), k, m)
66     assert decodeds == s, (ab(decodeds), ab(s),)
67
68 def _help_test_random_easy():
69     m = random.randrange(1, 257)
70     k = random.randrange(1, m+1)
71     l = random.randrange(0, 2**9)
72     s = randstr(l)
73     _h_easy(k, m, s)
74
75 def _help_test_random_with_l_easy(l):
76     m = random.randrange(1, 257)
77     k = random.randrange(1, m+1)
78     s = randstr(l)
79     _h_easy(k, m, s)
80     
81 class ZFecTest(unittest.TestCase):
82     def test_small(self):
83         for i in range(16):
84             _help_test_random_with_l(i)
85         if VERBOSE:
86             print "%d randomized tests pass." % (i+1)
87
88     def test_random(self):
89         for i in range(3):
90             _help_test_random()
91         if VERBOSE:
92             print "%d randomized tests pass." % (i+1)
93
94     def test_bad_args_dec(self):
95         decer = zfec.Decoder(2, 4)
96
97         try:
98             decer.decode(98, []) # first argument is not a sequence
99         except TypeError, e:
100             assert "First argument was not a sequence" in str(e), e
101         else:
102             raise "Should have gotten TypeError for wrong type of second argument."
103
104         try:
105             decer.decode(["a", "b", ], ["c", "d",])
106         except zfec.Error, e:
107             assert "Precondition violation: second argument is required to contain int" in str(e), e
108         else:
109             raise "Should have gotten zfec.Error for wrong type of second argument."
110
111         try:
112             decer.decode(["a", "b", ], 98) # not a sequence at all
113         except TypeError, e:
114             assert "Second argument was not a sequence" in str(e), e
115         else:
116             raise "Should have gotten TypeError for wrong type of second argument."
117
118 class EasyFecTest(unittest.TestCase):
119     def test_small(self):
120         for i in range(16):
121             _help_test_random_with_l_easy(i)
122         if VERBOSE:
123             print "%d randomized tests pass." % (i+1)
124
125     def test_random(self):
126         for i in range(3):
127             _help_test_random_easy()
128         if VERBOSE:
129             print "%d randomized tests pass." % (i+1)
130
131     def test_bad_args_dec(self):
132         decer = zfec.easyfec.Decoder(2, 4)
133
134         try:
135             decer.decode(98, [0, 1], 0) # first argument is not a sequence
136         except TypeError, e:
137             assert "First argument was not a sequence" in str(e), e
138         else:
139             raise "Should have gotten TypeError for wrong type of second argument."
140
141         try:
142             decer.decode("ab", ["c", "d",], 0)
143         except zfec.Error, e:
144             assert "Precondition violation: second argument is required to contain int" in str(e), e
145         else:
146             raise "Should have gotten zfec.Error for wrong type of second argument."
147
148         try:
149             decer.decode("ab", 98, 0) # not a sequence at all
150         except TypeError, e:
151             assert "Second argument was not a sequence" in str(e), e
152         else:
153             raise "Should have gotten TypeError for wrong type of second argument."
154
155 class FileFec(unittest.TestCase):
156     def test_filefec_header(self):
157         for m in [1, 2, 3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
158             for k in [1, 2, 3, 5, 9, 17, 33, 65, 129, 255, 256,]:
159                 if k >= m:
160                     continue
161                 for pad in [0, 1, k-1,]:
162                     if pad >= k:
163                         continue
164                     for sh in [0, 1, m-1,]:
165                         if sh >= m:
166                             continue
167                         h = zfec.filefec._build_header(m, k, pad, sh)
168                         hio = cStringIO.StringIO(h)
169                         (rm, rk, rpad, rsh,) = zfec.filefec._parse_header(hio)
170                         assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
171
172     def _help_test_filefec(self, teststr, k, m, numshs=None):
173         if numshs == None:
174             numshs = m
175
176         TESTFNAME = "testfile.txt"
177         PREFIX = "test"
178         SUFFIX = ".fec"
179
180         fsize = len(teststr)
181
182         tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
183         try:
184             tempf = tempdir.file(TESTFNAME, 'w+b')
185             tempf.write(teststr)
186             tempf.flush()
187             tempf.seek(0)
188
189             # encode the file
190             zfec.filefec.encode_to_files(tempf, fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
191
192             # select some share files
193             RE=re.compile(zfec.filefec.RE_FORMAT % (PREFIX, SUFFIX,))
194             fns = os.listdir(tempdir.name)
195             assert len(fns) >= m, (fns, tempdir, tempdir.name,)
196             sharefs = [ open(os.path.join(tempdir.name, fn), "rb") for fn in fns if RE.match(fn) ]
197             for sharef in sharefs:
198                 tempdir.register_file(sharef)
199             random.shuffle(sharefs)
200             del sharefs[numshs:]
201
202             # decode from the share files
203             outf = tempdir.file('recovered-testfile.txt', 'w+b')
204             zfec.filefec.decode_from_files(outf, sharefs, verbose=VERBOSE)
205             outf.flush()
206             outf.seek(0)
207             recovereddata = outf.read()
208             assert recovereddata == teststr, (ab(recovereddata), ab(teststr),)
209         finally:
210             tempdir.shutdown()
211
212     def test_filefec_all_shares(self):
213         return self._help_test_filefec("Yellow Whirled!", 3, 8)
214
215     def test_filefec_all_shares_1_b(self):
216         return self._help_test_filefec("Yellow Whirled!", 4, 16)
217
218     def test_filefec_all_shares_2(self):
219         return self._help_test_filefec("Yellow Whirled", 3, 8)
220
221     def test_filefec_all_shares_2_b(self):
222         return self._help_test_filefec("Yellow Whirled", 4, 16)
223
224     def test_filefec_all_shares_3(self):
225         return self._help_test_filefec("Yellow Whirle", 3, 8)
226
227     def test_filefec_all_shares_3_b(self):
228         return self._help_test_filefec("Yellow Whirle", 4, 16)
229
230     def test_filefec_all_shares_with_padding(self, noisy=VERBOSE):
231         return self._help_test_filefec("Yellow Whirled!A", 3, 8)
232
233     def test_filefec_min_shares_with_padding(self, noisy=VERBOSE):
234         return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3)
235
236     def test_filefec_min_shares_with_crlf(self, noisy=VERBOSE):
237         return self._help_test_filefec("llow Whirled!A\r\n", 3, 8, numshs=3)
238
239     def test_filefec_min_shares_with_lf(self, noisy=VERBOSE):
240         return self._help_test_filefec("Yellow Whirled!A\n", 3, 8, numshs=3)
241
242     def test_filefec_min_shares_with_lflf(self, noisy=VERBOSE):
243         return self._help_test_filefec("Yellow Whirled!A\n\n", 3, 8, numshs=3)
244
245     def test_filefec_min_shares_with_crcrlflf(self, noisy=VERBOSE):
246         return self._help_test_filefec("Yellow Whirled!A\r\r\n\n", 3, 8, numshs=3)
247
248  
249 class Cmdline(unittest.TestCase):
250     def test_basic(self, noisy=VERBOSE):
251         tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
252         fo = tempdir.file("test.data", "w+b")
253         fo.write("WHEHWHJEKWAHDLJAWDHWALKDHA")
254
255         import sys
256         realargv = sys.argv
257         try:
258             DEFAULT_M=8
259             DEFAULT_K=3
260             sys.argv = ["zfec", os.path.join(tempdir.name, "test.data"),]
261         
262             retcode = zfec.cmdline_zfec.main()
263             assert retcode == 0, retcode
264
265             RE=re.compile(zfec.filefec.RE_FORMAT % ('test.data', ".fec",))
266             fns = os.listdir(tempdir.name)
267             assert len(fns) >= DEFAULT_M, (fns, DEFAULT_M, tempdir, tempdir.name,)
268             sharefns = [ os.path.join(tempdir.name, fn) for fn in fns if RE.match(fn) ]
269             random.shuffle(sharefns)
270             del sharefns[DEFAULT_K:]
271
272             sys.argv = ["zunfec",]
273             sys.argv.extend(sharefns)
274             sys.argv.extend(['-o', os.path.join(tempdir.name, 'test.data-recovered'),])
275             
276             retcode = zfec.cmdline_zunfec.main()
277             assert retcode == 0, retcode
278             import filecmp
279             assert filecmp.cmp(os.path.join(tempdir.name, 'test.data'), os.path.join(tempdir.name, 'test.data-recovered'))
280         finally:
281             sys.argv = realargv
282
283 if __name__ == "__main__":
284     unittest.main()
285
286 # zfec -- fast forward error correction library with Python interface
287
288 # Copyright (C) 2007 Allmydata, Inc.
289 # Author: Zooko Wilcox-O'Hearn
290
291 # This file is part of zfec.
292 #
293 # See README.txt for licensing information.