4 # import bindann.monkeypatch.all
6 import cStringIO, os, random, re, sys
11 from twisted.trial import unittest
13 # trial is unavailable, oh well
19 sys.argv.pop(sys.argv.index('-v'))
22 from base64 import b32encode
23 def ab(x): # debuggery
25 return "%s:%s" % (len(x), b32encode(x[-3:]),)
27 return "%s:%s" % (len(x), b32encode(x[-2:]),)
29 return "%s:%s" % (len(x), b32encode(x[-1:]),)
31 return "%s:%s" % (len(x), "--empty--",)
34 encer = zfec.Encoder(k, m)
35 nums_and_blocks = list(enumerate(encer.encode(ss)))
36 assert isinstance(nums_and_blocks, list), nums_and_blocks
37 assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
38 nums_and_blocks = random.sample(nums_and_blocks, k)
39 blocks = [ x[1] for x in nums_and_blocks ]
40 nums = [ x[0] for x in nums_and_blocks ]
41 decer = zfec.Decoder(k, m)
42 decoded = decer.decode(blocks, nums)
43 assert len(decoded) == len(ss), (len(decoded), len(ss),)
44 assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
47 return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
49 def _help_test_random():
50 m = random.randrange(1, 257)
51 k = random.randrange(1, m+1)
52 l = random.randrange(0, 2**10)
53 ss = [ randstr(l/k) for x in range(k) ]
56 def _help_test_random_with_l(l):
59 ss = [ randstr(l/k) for x in range(k) ]
62 class ZFec(unittest.TestCase):
63 def test_random(self):
67 print "%d randomized tests pass." % (i+1)
69 def test_bad_args_enc(self):
70 encer = zfec.Encoder(2, 4)
72 encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",])
74 assert "Precondition violation: second argument is required to contain int" in str(e), e
76 raise "Should have gotten zfec.Error for wrong type of second argument."
79 encer.encode(["a", "b", ], 98) # not a sequence at all
81 assert "Second argument (optional) was not a sequence" in str(e), e
83 raise "Should have gotten TypeError for wrong type of second argument."
85 def test_bad_args_dec(self):
86 decer = zfec.Decoder(2, 4)
89 decer.decode(98, [0, 1]) # first argument is not a sequence
91 assert "First argument was not a sequence" in str(e), e
93 raise "Should have gotten TypeError for wrong type of second argument."
96 decer.decode(["a", "b", ], ["c", "d",])
98 assert "Precondition violation: second argument is required to contain int" in str(e), e
100 raise "Should have gotten zfec.Error for wrong type of second argument."
103 decer.decode(["a", "b", ], 98) # not a sequence at all
105 assert "Second argument was not a sequence" in str(e), e
107 raise "Should have gotten TypeError for wrong type of second argument."
109 class FileFec(unittest.TestCase):
110 def test_filefec_header(self):
111 for m in [3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
112 for k in [2, 3, 5, 9, 17, 33, 65, 129, 255,]:
115 for pad in [0, 1, k-1,]:
118 for sh in [0, 1, m-1,]:
121 h = zfec.filefec._build_header(m, k, pad, sh)
122 hio = cStringIO.StringIO(h)
123 (rm, rk, rpad, rsh,) = zfec.filefec._parse_header(hio)
124 assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
126 def _help_test_filefec(self, teststr, k, m, numshs=None):
130 TESTFNAME = "testfile.txt"
134 tempdir = zfec.util.fileutil.NamedTemporaryDirectory()
136 tempfn = os.path.join(tempdir.name, TESTFNAME)
137 tempf = open(tempfn, 'wb')
140 fsize = os.path.getsize(tempfn)
141 assert fsize == len(teststr)
144 zfec.filefec.encode_to_files(open(tempfn, 'rb'), fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
146 # select some share files
147 RE=re.compile(zfec.filefec.RE_FORMAT % (PREFIX, SUFFIX,))
148 fns = os.listdir(tempdir.name)
149 sharefs = [ open(os.path.join(tempdir.name, fn), "rb") for fn in fns if RE.match(fn) ]
150 random.shuffle(sharefs)
153 # decode from the share files
154 outf = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'wb')
155 zfec.filefec.decode_from_files(outf, sharefs, verbose=VERBOSE)
158 tempfn = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'rb')
159 recovereddata = tempfn.read()
160 assert recovereddata == teststr
164 def test_filefec_all_shares(self):
165 return self._help_test_filefec("Yellow Whirled!", 3, 8)
167 def test_filefec_all_shares_2(self):
168 return self._help_test_filefec("Yellow Whirled", 3, 8)
170 def test_filefec_all_shares_3(self):
171 return self._help_test_filefec("Yellow Whirle", 3, 8)
173 def test_filefec_all_shares_3_b(self):
174 return self._help_test_filefec("Yellow Whirle", 4, 16)
176 def test_filefec_all_shares_2_b(self):
177 return self._help_test_filefec("Yellow Whirled", 4, 16)
179 def test_filefec_all_shares_1_b(self):
180 return self._help_test_filefec("Yellow Whirled!", 4, 16)
182 def test_filefec_all_shares_with_padding(self, noisy=VERBOSE):
183 return self._help_test_filefec("Yellow Whirled!A", 3, 8)
185 def test_filefec_min_shares_with_padding(self, noisy=VERBOSE):
186 return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3)
188 if __name__ == "__main__":
189 if hasattr(unittest, 'main'):
192 sys.path.append(os.getcwd())
194 fullname = os.path.realpath(os.path.abspath(__file__))
195 for pathel in sys.path:
196 fullnameofpathel = os.path.realpath(os.path.abspath(pathel))
197 if fullname.startswith(fullnameofpathel):
198 relname = fullname[len(fullnameofpathel):]
199 mod = (os.path.splitext(relname)[0]).replace(os.sep, '.').strip('.')
202 mods.sort(cmp=lambda x, y: cmp(len(x), len(y)))
205 cmdstr = "trial %s %s" % (' '.join(sys.argv[1:]), mod)
207 if os.system(cmdstr) == 0:
210 # zfec -- fast forward error correction library with Python interface
212 # Copyright (C) 2007 Allmydata, Inc.
213 # Author: Zooko Wilcox-O'Hearn
214 # mailto:zooko@zooko.com
216 # This file is part of zfec.
218 # This program is free software; you can redistribute it and/or modify it under
219 # the terms of the GNU General Public License as published by the Free Software
220 # Foundation; either version 2 of the License, or (at your option) any later
221 # version. This program also comes with the added permission that, in the case
222 # that you are obligated to release a derived work under this licence (as per
223 # section 2.b of the GPL), you may delay the fulfillment of this obligation for
226 # This program is distributed in the hope that it will be useful,
227 # but WITHOUT ANY WARRANTY; without even the implied warranty of
228 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
229 # GNU General Public License for more details.
231 # You should have received a copy of the GNU General Public License
232 # along with this program; if not, write to the Free Software
233 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.