]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - zfec/zfec/test/test_zfec.py
zfec: turn off the feature of leaving the tmp data around for post-mortem analysis
[tahoe-lafs/zfec.git] / zfec / zfec / test / test_zfec.py
1 #!/usr/bin/env python
2
3 # import bindann
4 # import bindann.monkeypatch.all
5
6 import cStringIO, os, random, re, sys
7
8 import zfec
9
10 try:
11     from twisted.trial import unittest
12 except ImportError:
13     # trial is unavailable, oh well
14     import unittest
15
16 global VERBOSE
17 VERBOSE=False
18 if '-v' in sys.argv:
19     sys.argv.pop(sys.argv.index('-v'))
20     VERBOSE=True
21
22 from base64 import b32encode
23 def ab(x): # debuggery
24     if len(x) >= 3:
25         return "%s:%s" % (len(x), b32encode(x[-3:]),)
26     elif len(x) == 2:
27         return "%s:%s" % (len(x), b32encode(x[-2:]),)
28     elif len(x) == 1:
29         return "%s:%s" % (len(x), b32encode(x[-1:]),)
30     elif len(x) == 0:
31         return "%s:%s" % (len(x), "--empty--",)
32
33 def _h(k, m, ss):
34     encer = zfec.Encoder(k, m)
35     nums_and_blocks = list(enumerate(encer.encode(ss)))
36     assert isinstance(nums_and_blocks, list), nums_and_blocks
37     assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
38     nums_and_blocks = random.sample(nums_and_blocks, k)
39     blocks = [ x[1] for x in nums_and_blocks ]
40     nums = [ x[0] for x in nums_and_blocks ]
41     decer = zfec.Decoder(k, m)
42     decoded = decer.decode(blocks, nums)
43     assert len(decoded) == len(ss), (len(decoded), len(ss),)
44     assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
45
46 def randstr(n):
47     return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
48
49 def _help_test_random():
50     m = random.randrange(1, 257)
51     k = random.randrange(1, m+1)
52     l = random.randrange(0, 2**10)
53     ss = [ randstr(l/k) for x in range(k) ]
54     _h(k, m, ss)
55
56 def _help_test_random_with_l(l):
57     m = 83
58     k = 19
59     ss = [ randstr(l/k) for x in range(k) ]
60     _h(k, m, ss)
61
62 class ZFec(unittest.TestCase):
63     def test_random(self):
64         for i in range(3):
65             _help_test_random()
66         if VERBOSE:
67             print "%d randomized tests pass." % (i+1)
68
69     def test_bad_args_enc(self):
70         encer = zfec.Encoder(2, 4)
71         try:
72             encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",])
73         except zfec.Error, e:
74             assert "Precondition violation: second argument is required to contain int" in str(e), e
75         else:
76             raise "Should have gotten zfec.Error for wrong type of second argument."
77
78         try:
79             encer.encode(["a", "b", ], 98) # not a sequence at all
80         except TypeError, e:
81             assert "Second argument (optional) was not a sequence" in str(e), e
82         else:
83             raise "Should have gotten TypeError for wrong type of second argument."
84
85     def test_bad_args_dec(self):
86         decer = zfec.Decoder(2, 4)
87
88         try:
89             decer.decode(98, [0, 1]) # first argument is not a sequence
90         except TypeError, e:
91             assert "First argument was not a sequence" in str(e), e
92         else:
93             raise "Should have gotten TypeError for wrong type of second argument."
94
95         try:
96             decer.decode(["a", "b", ], ["c", "d",])
97         except zfec.Error, e:
98             assert "Precondition violation: second argument is required to contain int" in str(e), e
99         else:
100             raise "Should have gotten zfec.Error for wrong type of second argument."
101
102         try:
103             decer.decode(["a", "b", ], 98) # not a sequence at all
104         except TypeError, e:
105             assert "Second argument was not a sequence" in str(e), e
106         else:
107             raise "Should have gotten TypeError for wrong type of second argument."
108
109 class FileFec(unittest.TestCase):
110     def test_filefec_header(self):
111         for m in [3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
112             for k in [2, 3, 5, 9, 17, 33, 65, 129, 255,]:
113                 if k >= m:
114                     continue
115                 for pad in [0, 1, k-1,]:
116                     if pad >= k:
117                         continue
118                     for sh in [0, 1, m-1,]:
119                         if sh >= m:
120                             continue
121                         h = zfec.filefec._build_header(m, k, pad, sh)
122                         hio = cStringIO.StringIO(h)
123                         (rm, rk, rpad, rsh,) = zfec.filefec._parse_header(hio)
124                         assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
125
126     def _help_test_filefec(self, teststr, k, m, numshs=None):
127         if numshs == None:
128             numshs = m
129
130         TESTFNAME = "testfile.txt"
131         PREFIX = "test"
132         SUFFIX = ".fec"
133
134         tempdir = zfec.util.fileutil.NamedTemporaryDirectory()
135         try:
136             tempfn = os.path.join(tempdir.name, TESTFNAME)
137             tempf = open(tempfn, 'wb')
138             tempf.write(teststr)
139             tempf.close()
140             fsize = os.path.getsize(tempfn)
141             assert fsize == len(teststr)
142
143             # encode the file
144             zfec.filefec.encode_to_files(open(tempfn, 'rb'), fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
145
146             # select some share files
147             RE=re.compile(zfec.filefec.RE_FORMAT % (PREFIX, SUFFIX,))
148             fns = os.listdir(tempdir.name)
149             sharefs = [ open(os.path.join(tempdir.name, fn), "rb") for fn in fns if RE.match(fn) ]
150             random.shuffle(sharefs)
151             del sharefs[numshs:]
152
153             # decode from the share files
154             outf = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'wb')
155             zfec.filefec.decode_from_files(outf, sharefs, verbose=VERBOSE)
156             outf.close()
157
158             tempfn = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'rb')
159             recovereddata = tempfn.read()
160             assert recovereddata == teststr
161         finally:
162             tempdir.shutdown()
163
164     def test_filefec_all_shares(self):
165         return self._help_test_filefec("Yellow Whirled!", 3, 8)
166
167     def test_filefec_all_shares_2(self):
168         return self._help_test_filefec("Yellow Whirled", 3, 8)
169
170     def test_filefec_all_shares_3(self):
171         return self._help_test_filefec("Yellow Whirle", 3, 8)
172
173     def test_filefec_all_shares_3_b(self):
174         return self._help_test_filefec("Yellow Whirle", 4, 16)
175
176     def test_filefec_all_shares_2_b(self):
177         return self._help_test_filefec("Yellow Whirled", 4, 16)
178
179     def test_filefec_all_shares_1_b(self):
180         return self._help_test_filefec("Yellow Whirled!", 4, 16)
181
182     def test_filefec_all_shares_with_padding(self, noisy=VERBOSE):
183         return self._help_test_filefec("Yellow Whirled!A", 3, 8)
184
185     def test_filefec_min_shares_with_padding(self, noisy=VERBOSE):
186         return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3)
187
188 if __name__ == "__main__":
189     if hasattr(unittest, 'main'):
190         unittest.main()
191     else:
192         sys.path.append(os.getcwd())
193         mods = []
194         fullname = os.path.realpath(os.path.abspath(__file__))
195         for pathel in sys.path:
196             fullnameofpathel = os.path.realpath(os.path.abspath(pathel))
197             if fullname.startswith(fullnameofpathel):
198                 relname = fullname[len(fullnameofpathel):]
199                 mod = (os.path.splitext(relname)[0]).replace(os.sep, '.').strip('.')
200                 mods.append(mod)
201
202         mods.sort(cmp=lambda x, y: cmp(len(x), len(y)))
203         mods.reverse()
204         for mod in mods:
205             cmdstr = "trial %s %s" % (' '.join(sys.argv[1:]), mod)
206             print cmdstr
207             if os.system(cmdstr) == 0:
208                 break
209
210 # zfec -- fast forward error correction library with Python interface
211 #
212 # Copyright (C) 2007 Allmydata, Inc.
213 # Author: Zooko Wilcox-O'Hearn
214 # mailto:zooko@zooko.com
215 #
216 # This file is part of zfec.
217 #
218 # This program is free software; you can redistribute it and/or modify it under
219 # the terms of the GNU General Public License as published by the Free Software
220 # Foundation; either version 2 of the License, or (at your option) any later
221 # version.  This program also comes with the added permission that, in the case
222 # that you are obligated to release a derived work under this licence (as per
223 # section 2.b of the GPL), you may delay the fulfillment of this obligation for
224 # up to 12 months.
225 #
226 # This program is distributed in the hope that it will be useful,
227 # but WITHOUT ANY WARRANTY; without even the implied warranty of
228 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
229 # GNU General Public License for more details.
230 #
231 # You should have received a copy of the GNU General Public License
232 # along with this program; if not, write to the Free Software
233 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
234