]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - zfec/zfec/test/test_zfec.py
zfec: trivial reformatting of unit test names
[tahoe-lafs/zfec.git] / zfec / zfec / test / test_zfec.py
1 #!/usr/bin/env python
2
3 import cStringIO, os, random, re
4
5 import unittest
6
7 global VERBOSE
8 VERBOSE=False
9
10 import zfec
11
12 from base64 import b32encode
13 def ab(x): # debuggery
14     if len(x) >= 3:
15         return "%s:%s" % (len(x), b32encode(x[-3:]),)
16     elif len(x) == 2:
17         return "%s:%s" % (len(x), b32encode(x[-2:]),)
18     elif len(x) == 1:
19         return "%s:%s" % (len(x), b32encode(x[-1:]),)
20     elif len(x) == 0:
21         return "%s:%s" % (len(x), "--empty--",)
22
23 def _h(k, m, ss):
24     encer = zfec.Encoder(k, m)
25     nums_and_blocks = list(enumerate(encer.encode(ss)))
26     assert isinstance(nums_and_blocks, list), nums_and_blocks
27     assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
28     nums_and_blocks = random.sample(nums_and_blocks, k)
29     blocks = [ x[1] for x in nums_and_blocks ]
30     nums = [ x[0] for x in nums_and_blocks ]
31     decer = zfec.Decoder(k, m)
32     decoded = decer.decode(blocks, nums)
33     assert len(decoded) == len(ss), (len(decoded), len(ss),)
34     assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
35
36 def randstr(n):
37     return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
38
39 def _help_test_random():
40     m = random.randrange(1, 257)
41     k = random.randrange(1, m+1)
42     l = random.randrange(0, 2**10)
43     ss = [ randstr(l/k) for x in range(k) ]
44     _h(k, m, ss)
45
46 def _help_test_random_with_l(l):
47     m = 83
48     k = 19
49     ss = [ randstr(l/k) for x in range(k) ]
50     _h(k, m, ss)
51
52 class ZFec(unittest.TestCase):
53     def test_random(self):
54         for i in range(3):
55             _help_test_random()
56         if VERBOSE:
57             print "%d randomized tests pass." % (i+1)
58
59     def test_bad_args_enc(self):
60         encer = zfec.Encoder(2, 4)
61         try:
62             encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",])
63         except zfec.Error, e:
64             assert "Precondition violation: second argument is required to contain int" in str(e), e
65         else:
66             raise "Should have gotten zfec.Error for wrong type of second argument."
67
68         try:
69             encer.encode(["a", "b", ], 98) # not a sequence at all
70         except TypeError, e:
71             assert "Second argument (optional) was not a sequence" in str(e), e
72         else:
73             raise "Should have gotten TypeError for wrong type of second argument."
74
75     def test_bad_args_dec(self):
76         decer = zfec.Decoder(2, 4)
77
78         try:
79             decer.decode(98, [0, 1]) # first argument is not a sequence
80         except TypeError, e:
81             assert "First argument was not a sequence" in str(e), e
82         else:
83             raise "Should have gotten TypeError for wrong type of second argument."
84
85         try:
86             decer.decode(["a", "b", ], ["c", "d",])
87         except zfec.Error, e:
88             assert "Precondition violation: second argument is required to contain int" in str(e), e
89         else:
90             raise "Should have gotten zfec.Error for wrong type of second argument."
91
92         try:
93             decer.decode(["a", "b", ], 98) # not a sequence at all
94         except TypeError, e:
95             assert "Second argument was not a sequence" in str(e), e
96         else:
97             raise "Should have gotten TypeError for wrong type of second argument."
98
99 class FileFec(unittest.TestCase):
100     def test_filefec_header(self):
101         for m in [3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
102             for k in [2, 3, 5, 9, 17, 33, 65, 129, 255,]:
103                 if k >= m:
104                     continue
105                 for pad in [0, 1, k-1,]:
106                     if pad >= k:
107                         continue
108                     for sh in [0, 1, m-1,]:
109                         if sh >= m:
110                             continue
111                         h = zfec.filefec._build_header(m, k, pad, sh)
112                         hio = cStringIO.StringIO(h)
113                         (rm, rk, rpad, rsh,) = zfec.filefec._parse_header(hio)
114                         assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
115
116     def _help_test_filefec(self, teststr, k, m, numshs=None):
117         if numshs == None:
118             numshs = m
119
120         TESTFNAME = "testfile.txt"
121         PREFIX = "test"
122         SUFFIX = ".fec"
123
124         fsize = len(teststr)
125
126         tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=True)
127         try:
128             tempf = tempdir.file(TESTFNAME, 'w+b')
129             tempf.write(teststr)
130             tempf.seek(0)
131
132             # encode the file
133             zfec.filefec.encode_to_files(tempf, fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
134
135             # select some share files
136             RE=re.compile(zfec.filefec.RE_FORMAT % (PREFIX, SUFFIX,))
137             fns = os.listdir(tempdir.name)
138             assert len(fns) >= m, (fns, tempdir, tempdir.name,)
139             sharefs = [ open(os.path.join(tempdir.name, fn), "rb") for fn in fns if RE.match(fn) ]
140             for sharef in sharefs:
141                 tempdir.register_file(sharef)
142             random.shuffle(sharefs)
143             del sharefs[numshs:]
144
145             # decode from the share files
146             outf = tempdir.file('recovered-testfile.txt', 'w+b')
147             zfec.filefec.decode_from_files(outf, sharefs, verbose=VERBOSE)
148             outf.seek(0)
149             recovereddata = outf.read()
150             assert recovereddata == teststr
151         finally:
152             tempdir.shutdown()
153
154     def test_filefec_all_shares(self):
155         return self._help_test_filefec("Yellow Whirled!", 3, 8)
156
157     def test_filefec_all_shares_1_b(self):
158         return self._help_test_filefec("Yellow Whirled!", 4, 16)
159
160     def test_filefec_all_shares_2(self):
161         return self._help_test_filefec("Yellow Whirled", 3, 8)
162
163     def test_filefec_all_shares_2_b(self):
164         return self._help_test_filefec("Yellow Whirled", 4, 16)
165
166     def test_filefec_all_shares_3(self):
167         return self._help_test_filefec("Yellow Whirle", 3, 8)
168
169     def test_filefec_all_shares_3_b(self):
170         return self._help_test_filefec("Yellow Whirle", 4, 16)
171
172     def test_filefec_all_shares_with_padding(self, noisy=VERBOSE):
173         return self._help_test_filefec("Yellow Whirled!A", 3, 8)
174
175     def test_filefec_min_shares_with_padding(self, noisy=VERBOSE):
176         return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3)
177
178     def test_filefec_min_shares_with_crlf(self, noisy=VERBOSE):
179         return self._help_test_filefec("llow Whirled!A\r\n", 3, 8, numshs=3)
180
181     def test_filefec_min_shares_with_lf(self, noisy=VERBOSE):
182         return self._help_test_filefec("Yellow Whirled!A\n", 3, 8, numshs=3)
183
184     def test_filefec_min_shares_with_lflf(self, noisy=VERBOSE):
185         return self._help_test_filefec("Yellow Whirled!A\n\n", 3, 8, numshs=3)
186
187     def test_filefec_min_shares_with_crcrlflf(self, noisy=VERBOSE):
188         return self._help_test_filefec("Yellow Whirled!A\r\r\n\n", 3, 8, numshs=3)
189
190  
191 class Cmdline(unittest.TestCase):
192     def test_basic(self, noisy=VERBOSE):
193         tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=True)
194         fo = tempdir.file("test.data", "w+b")
195         fo.write("WHEHWHJEKWAHDLJAWDHWALKDHA")
196
197         import sys
198         realargv = sys.argv
199         try:
200             DEFAULT_M=16
201             DEFAULT_K=4
202             sys.argv = ["zfec", os.path.join(tempdir.name, "test.data"),]
203         
204             retcode = zfec.cmdline_zfec.main()
205             assert retcode == 0, retcode
206
207             RE=re.compile(zfec.filefec.RE_FORMAT % ('test.data', ".fec",))
208             fns = os.listdir(tempdir.name)
209             assert len(fns) >= DEFAULT_M, (fns, tempdir, tempdir.name,)
210             sharefns = [ os.path.join(tempdir.name, fn) for fn in fns if RE.match(fn) ]
211             random.shuffle(sharefns)
212             del sharefns[DEFAULT_K:]
213
214             sys.argv = ["zunfec",]
215             sys.argv.extend(sharefns)
216             sys.argv.extend(['-o', os.path.join(tempdir.name, 'test.data-recovered'),])
217             
218             retcode = zfec.cmdline_zunfec.main()
219             assert retcode == 0, retcode
220             import filecmp
221             assert filecmp.cmp(os.path.join(tempdir.name, 'test.data'), os.path.join(tempdir.name, 'test.data-recovered'))
222         finally:
223             sys.argv = realargv
224
225
226 # zfec -- fast forward error correction library with Python interface
227
228 # Copyright (C) 2007 Allmydata, Inc.
229 # Author: Zooko Wilcox-O'Hearn
230
231 # This file is part of zfec.
232
233 # This program is free software; you can redistribute it and/or modify it
234 # under the terms of the GNU General Public License as published by the Free
235 # Software Foundation; either version 2 of the License, or (at your option)
236 # any later version, with the added permission that, if you become obligated
237 # to release a derived work under this licence (as per section 2.b), you may
238 # delay the fulfillment of this obligation for up to 12 months.  See the file
239 # COPYING for details.
240 #
241 # If you would like to inquire about a commercial relationship with Allmydata,
242 # Inc., please contact partnerships@allmydata.com and visit
243 # http://allmydata.com/.