4 # import bindann.monkeypatch.all
6 # zfec -- fast forward error correction library with Python interface
8 # Copyright (C) 2007 Allmydata, Inc.
9 # Author: Zooko Wilcox-O'Hearn
10 # mailto:zooko@zooko.com
12 # This file is part of zfec.
14 # This program is free software; you can redistribute it and/or modify it under
15 # the terms of the GNU General Public License as published by the Free Software
16 # Foundation; either version 2 of the License, or (at your option) any later
17 # version. This program also comes with the added permission that, in the case
18 # that you are obligated to release a derived work under this licence (as per
19 # section 2.b of the GPL), you may delay the fulfillment of this obligation for
22 # This program is distributed in the hope that it will be useful,
23 # but WITHOUT ANY WARRANTY; without even the implied warranty of
24 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 # GNU General Public License for more details.
27 # You should have received a copy of the GNU General Public License
28 # along with this program; if not, write to the Free Software
29 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
31 import cStringIO, os, random, re, sys
36 from twisted.trial import unittest
38 # trial is unavailable, oh well
44 sys.argv.pop(sys.argv.index('-v'))
47 from base64 import b32encode
48 def ab(x): # debuggery
50 return "%s:%s" % (len(x), b32encode(x[-3:]),)
52 return "%s:%s" % (len(x), b32encode(x[-2:]),)
54 return "%s:%s" % (len(x), b32encode(x[-1:]),)
56 return "%s:%s" % (len(x), "--empty--",)
59 encer = zfec.Encoder(k, m)
60 nums_and_blocks = list(enumerate(encer.encode(ss)))
61 assert isinstance(nums_and_blocks, list), nums_and_blocks
62 assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,)
63 nums_and_blocks = random.sample(nums_and_blocks, k)
64 blocks = [ x[1] for x in nums_and_blocks ]
65 nums = [ x[0] for x in nums_and_blocks ]
66 decer = zfec.Decoder(k, m)
67 decoded = decer.decode(blocks, nums)
68 assert len(decoded) == len(ss), (len(decoded), len(ss),)
69 assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),)
72 return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
74 def _help_test_random():
75 m = random.randrange(1, 257)
76 k = random.randrange(1, m+1)
77 l = random.randrange(0, 2**10)
78 ss = [ randstr(l/k) for x in range(k) ]
81 def _help_test_random_with_l(l):
84 ss = [ randstr(l/k) for x in range(k) ]
87 class ZFec(unittest.TestCase):
88 def test_random(self):
92 print "%d randomized tests pass." % (i+1)
94 def test_bad_args_enc(self):
95 encer = zfec.Encoder(2, 4)
97 encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",])
99 assert "Precondition violation: second argument is required to contain int" in str(e), e
101 raise "Should have gotten zfec.Error for wrong type of second argument."
104 encer.encode(["a", "b", ], 98) # not a sequence at all
106 assert "Second argument (optional) was not a sequence" in str(e), e
108 raise "Should have gotten TypeError for wrong type of second argument."
110 def test_bad_args_dec(self):
111 decer = zfec.Decoder(2, 4)
114 decer.decode(98, [0, 1]) # first argument is not a sequence
116 assert "First argument was not a sequence" in str(e), e
118 raise "Should have gotten TypeError for wrong type of second argument."
121 decer.decode(["a", "b", ], ["c", "d",])
122 except zfec.Error, e:
123 assert "Precondition violation: second argument is required to contain int" in str(e), e
125 raise "Should have gotten zfec.Error for wrong type of second argument."
128 decer.decode(["a", "b", ], 98) # not a sequence at all
130 assert "Second argument was not a sequence" in str(e), e
132 raise "Should have gotten TypeError for wrong type of second argument."
134 class FileFec(unittest.TestCase):
135 def test_filefec_header(self):
136 for m in [3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]:
137 for k in [2, 3, 5, 9, 17, 33, 65, 129, 255,]:
140 for pad in [0, 1, k-1,]:
143 for sh in [0, 1, m-1,]:
146 h = zfec.filefec._build_header(m, k, pad, sh)
147 hio = cStringIO.StringIO(h)
148 (rm, rk, rpad, rsh,) = zfec.filefec._parse_header(hio)
149 assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
151 def _help_test_filefec(self, teststr, k, m, numshs=None):
155 TESTFNAME = "testfile.txt"
159 tempdir = zfec.util.fileutil.NamedTemporaryDirectory(cleanup=False)
161 tempfn = os.path.join(tempdir.name, TESTFNAME)
162 tempf = open(tempfn, 'wb')
165 fsize = os.path.getsize(tempfn)
166 assert fsize == len(teststr)
169 zfec.filefec.encode_to_files(open(tempfn, 'rb'), fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE)
171 # select some share files
172 RE=re.compile(zfec.filefec.RE_FORMAT % (PREFIX, SUFFIX,))
173 fns = os.listdir(tempdir.name)
174 sharefs = [ open(os.path.join(tempdir.name, fn), "rb") for fn in fns if RE.match(fn) ]
175 random.shuffle(sharefs)
178 # decode from the share files
179 outf = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'wb')
180 zfec.filefec.decode_from_files(outf, sharefs, verbose=VERBOSE)
183 tempfn = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'rb')
184 recovereddata = tempfn.read()
185 assert recovereddata == teststr
189 def test_filefec_all_shares(self):
190 return self._help_test_filefec("Yellow Whirled!", 3, 8)
192 def test_filefec_all_shares_with_padding(self, noisy=VERBOSE):
193 return self._help_test_filefec("Yellow Whirled!A", 3, 8)
195 def test_filefec_min_shares_with_padding(self, noisy=VERBOSE):
196 return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3)
198 if __name__ == "__main__":
199 if hasattr(unittest, 'main'):
202 sys.path.append(os.getcwd())
204 fullname = os.path.realpath(os.path.abspath(__file__))
205 for pathel in sys.path:
206 fullnameofpathel = os.path.realpath(os.path.abspath(pathel))
207 if fullname.startswith(fullnameofpathel):
208 relname = fullname[len(fullnameofpathel):]
209 mod = (os.path.splitext(relname)[0]).replace(os.sep, '.').strip('.')
212 mods.sort(cmp=lambda x, y: cmp(len(x), len(y)))
215 cmdstr = "trial %s %s" % (' '.join(sys.argv[1:]), mod)
217 if os.system(cmdstr) == 0: