]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - zfec/zfec/test/test_zfec.py
zfec: finish up some renaming of pyfec to zfec
[tahoe-lafs/zfec.git] / zfec / zfec / test / test_zfec.py
1 #!/usr/bin/env python
2
3 # import bindann
4 # import bindann.monkeypatch.all
5
6 # zfec -- fast forward error correction library with Python interface
7 #
8 # Copyright (C) 2007 Allmydata, Inc.
9 # Author: Zooko Wilcox-O'Hearn
10 # mailto:zooko@zooko.com
11 #
12 # This file is part of zfec.
13 #
14 # This program is free software; you can redistribute it and/or modify it under
15 # the terms of the GNU General Public License as published by the Free Software
16 # Foundation; either version 2 of the License, or (at your option) any later
17 # version.  This program also comes with the added permission that, in the case
18 # that you are obligated to release a derived work under this licence (as per
19 # section 2.b of the GPL), you may delay the fulfillment of this obligation for
20 # up to 12 months.
21 #
22 # This program is distributed in the hope that it will be useful,
23 # but WITHOUT ANY WARRANTY; without even the implied warranty of
24 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25 # GNU General Public License for more details.
26 #
27 # You should have received a copy of the GNU General Public License
28 # along with this program; if not, write to the Free Software
29 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
30
31 import cStringIO, os, random, re, sys
32
33 import zfec
34
35 try:
36     from twisted.trial import unittest
37 except ImportError:
38     # trial is unavailable, oh well
39     import unittest
40
41 global VERBOSE
42 VERBOSE=False
43 if '-v' in sys.argv:
44     sys.argv.pop(sys.argv.index('-v'))
45     VERBOSE=True
46
47 from base64 import b32encode
48 def ab(x): # debuggery
49     if len(x) >= 3:
50         return "%s:%s" % (len(x), b32encode(x[-3:]),)
51     elif len(x) == 2:
52         return "%s:%s" % (len(x), b32encode(x[-2:]),)
53     elif len(x) == 1:
54         return "%s:%s" % (len(x), b32encode(x[-1:]),)
55     elif len(x) == 0:
56         return "%s:%s" % (len(x), "--empty--",)
57
58 def _h(k, m, ss):
59     encer = zfec.Encoder(k, m)
60     nums_and_blocks = list(enumerate(encer.encode(ss)))
61     assert isinstance(nums_and_blocks, list), nums_and_blocks
62     assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
63     nums_and_blocks = random.sample(nums_and_blocks, k)
64     blocks = [ x[1] for x in nums_and_blocks ]
65     nums = [ x[0] for x in nums_and_blocks ]
66     decer = zfec.Decoder(k, m)
67     decoded = decer.decode(blocks, nums)
68     assert len(decoded) == len(ss), (len(decoded), len(ss),)
69     assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
70
71 def randstr(n):
72     return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
73
74 def _help_test_random():
75     m = random.randrange(1, 257)
76     k = random.randrange(1, m+1)
77     l = random.randrange(0, 2**10)
78     ss = [ randstr(l/k) for x in range(k) ]
79     _h(k, m, ss)
80
81 def _help_test_random_with_l(l):
82     m = 83
83     k = 19
84     ss = [ randstr(l/k) for x in range(k) ]
85     _h(k, m, ss)
86
87 class ZFec(unittest.TestCase):
88     def test_random(self):
89         for i in range(3):
90             _help_test_random()
91         if VERBOSE:
92             print "%d randomized tests pass." % (i+1)
93
94     def test_bad_args_enc(self):
95         encer = zfec.Encoder(2, 4)
96         try:
97             encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",])
98         except zfec.Error, e:
99             assert "Precondition violation: second argument is required to contain int" in str(e), e
100         else:
101             raise "Should have gotten zfec.Error for wrong type of second argument."
102
103         try:
104             encer.encode(["a", "b", ], 98) # not a sequence at all
105         except TypeError, e:
106             assert "Second argument (optional) was not a sequence" in str(e), e
107         else:
108             raise "Should have gotten TypeError for wrong type of second argument."
109
110     def test_bad_args_dec(self):
111         decer = zfec.Decoder(2, 4)
112
113         try:
114             decer.decode(98, [0, 1]) # first argument is not a sequence
115         except TypeError, e:
116             assert "First argument was not a sequence" in str(e), e
117         else:
118             raise "Should have gotten TypeError for wrong type of second argument."
119
120         try:
121             decer.decode(["a", "b", ], ["c", "d",])
122         except zfec.Error, e:
123             assert "Precondition violation: second argument is required to contain int" in str(e), e
124         else:
125             raise "Should have gotten zfec.Error for wrong type of second argument."
126
127         try:
128             decer.decode(["a", "b", ], 98) # not a sequence at all
129         except TypeError, e:
130             assert "Second argument was not a sequence" in str(e), e
131         else:
132             raise "Should have gotten TypeError for wrong type of second argument."
133
134 class FileFec(unittest.TestCase):
135     def test_filefec_header(self):
136         for m in [3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
137             for k in [2, 3, 5, 9, 17, 33, 65, 129, 255,]:
138                 if k >= m:
139                     continue
140                 for pad in [0, 1, k-1,]:
141                     if pad >= k:
142                         continue
143                     for sh in [0, 1, m-1,]:
144                         if sh >= m:
145                             continue
146                         h = zfec.filefec._build_header(m, k, pad, sh)
147                         hio = cStringIO.StringIO(h)
148                         (rm, rk, rpad, rsh,) = zfec.filefec._parse_header(hio)
149                         assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
150
151     def _help_test_filefec(self, teststr, k, m, numshs=None):
152         if numshs == None:
153             numshs = m
154
155         TESTFNAME = "testfile.txt"
156         PREFIX = "test"
157         SUFFIX = ".fec"
158
159         tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=False)
160         try:
161             tempfn = os.path.join(tempdir.name, TESTFNAME)
162             tempf = open(tempfn, 'wb')
163             tempf.write(teststr)
164             tempf.close()
165             fsize = os.path.getsize(tempfn)
166             assert fsize == len(teststr)
167
168             # encode the file
169             zfec.filefec.encode_to_files(open(tempfn, 'rb'), fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
170
171             # select some share files
172             RE=re.compile(zfec.filefec.RE_FORMAT % (PREFIX, SUFFIX,))
173             fns = os.listdir(tempdir.name)
174             sharefs = [ open(os.path.join(tempdir.name, fn), "rb") for fn in fns if RE.match(fn) ]
175             random.shuffle(sharefs)
176             del sharefs[numshs:]
177
178             # decode from the share files
179             outf = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'wb')
180             zfec.filefec.decode_from_files(outf, sharefs, verbose=VERBOSE)
181             outf.close()
182
183             tempfn = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'rb')
184             recovereddata = tempfn.read()
185             assert recovereddata == teststr
186         finally:
187             tempdir.shutdown()
188
189     def test_filefec_all_shares(self):
190         return self._help_test_filefec("Yellow Whirled!", 3, 8)
191
192     def test_filefec_all_shares_with_padding(self, noisy=VERBOSE):
193         return self._help_test_filefec("Yellow Whirled!A", 3, 8)
194
195     def test_filefec_min_shares_with_padding(self, noisy=VERBOSE):
196         return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3)
197
198 if __name__ == "__main__":
199     if hasattr(unittest, 'main'):
200         unittest.main()
201     else:
202         sys.path.append(os.getcwd())
203         mods = []
204         fullname = os.path.realpath(os.path.abspath(__file__))
205         for pathel in sys.path:
206             fullnameofpathel = os.path.realpath(os.path.abspath(pathel))
207             if fullname.startswith(fullnameofpathel):
208                 relname = fullname[len(fullnameofpathel):]
209                 mod = (os.path.splitext(relname)[0]).replace(os.sep, '.').strip('.')
210                 mods.append(mod)
211
212         mods.sort(cmp=lambda x, y: cmp(len(x), len(y)))
213         mods.reverse()
214         for mod in mods:
215             cmdstr = "trial %s %s" % (' '.join(sys.argv[1:]), mod)
216             print cmdstr
217             if os.system(cmdstr) == 0:
218                 break