From: Zooko O'Whielacronx Date: Sat, 14 Apr 2007 18:19:24 +0000 (-0700) Subject: pyfec: new filefec with compressed metadata, better error handling, much better unit... X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=7c3b35d286c0923025aa2ad7be45d19870eeb43d;p=tahoe-lafs%2Ftahoe-lafs.git pyfec: new filefec with compressed metadata, better error handling, much better unit tests --- diff --git a/pyfec/README.txt b/pyfec/README.txt index 59695fc8..74201663 100644 --- a/pyfec/README.txt +++ b/pyfec/README.txt @@ -17,9 +17,9 @@ whose loss it can tolerate. This package is largely based on the old "fec" library by Luigi Rizzo et al., which is a mature and optimized implementation of erasure coding. The pyfec package makes several changes from the original "fec" package, including -addition of the Python API, refactoring of the C API to be faster (for the way -that I use it, at least), and a few clean-ups and micro-optimizations of the -core code itself. +addition of the Python API, refactoring of the C API to support zero-copy +operation, and a few clean-ups and micro-optimizations of the core code +itself. * Community @@ -52,13 +52,21 @@ and k is required to be at least 1 and at most m. degenerates to the equivalent of the Unix "split" utility which simply splits the input into successive segments. Similarly, when k == 1 it degenerates to the equivalent of the unix "cp" utility -- each block is a complete copy of the -input data.) +input data. The "fec" command-line tool does not implement these degenerate +cases.) Note that each "primary block" is a segment of the original data, so its size is 1/k'th of the size of original data, and each "secondary block" is of the same size, so the total space used by all the blocks is m/k times the size of the original data (plus some padding to fill out the last primary block to be -the same size as all the others). +the same size as all the others). In addition to the data contained in the +blocks themselves there are also a few pieces of metadata which are necessary +for later reconstruction. Those pieces are: 1. the value of K, 2. the value +of M, 3. the sharenum of each block, 4. the number of bytes of padding +that were used. The "fec" command-line tool compresses these pieces of data +and prepends them to the beginning of each share, so each the sharefile +produced by the "fec" command-line tool is between one and four bytes larger +than the share data alone. The decoding step requires as input k of the blocks which were produced by the encoding step. The decoding step produces as output the data that was earlier @@ -136,8 +144,11 @@ objects (e.g. Python strings) to hold the data that you pass to pyfec. * Utilities -See also the filefec.py module which has a utility function for efficiently -reading a file and encoding it piece by piece. +The filefec.py module which has a utility function for efficiently reading a +file and encoding it piece by piece. + +The bin/ directory contains two commandline tools "fec" and "unfec". See +their usage strings for details. * Dependencies @@ -180,5 +191,5 @@ licence. Enjoy! Zooko Wilcox-O'Hearn -2007-08-01 +2007-04-11 Boulder, Colorado diff --git a/pyfec/fec/filefec.py b/pyfec/fec/filefec.py index 6c9ae138..b2279628 100644 --- a/pyfec/fec/filefec.py +++ b/pyfec/fec/filefec.py @@ -24,77 +24,237 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import easyfec, fec +from util import fileutil +from util.mathutil import log_ceil -import array, random +import array, os, re, struct, traceback -def encode_to_files_easyfec(inf, prefix, k, m): +CHUNKSIZE = 4096 + +def _build_header(m, k, pad, sh): """ - Encode inf, writing the shares to a file named $prefix+$sharenum. + @param m: the total number of shares; 3 <= m <= 256 + @param k: the number of shares required to reconstruct; 2 <= k < m + @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k + @param sh: the shnum of this share; 0 <= k < m + + @return: a string (which is hopefully short) encoding m, k, sh, and pad """ - l = [ open(prefix+str(sharenum), "wb") for sharenum in range(m) ] - def cb(blocks, length): - assert len(blocks) == len(l) - for i in range(len(blocks)): - l[i].write(blocks[i]) - - encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096) - -def encode_to_files_stringy(inf, prefix, k, m): + assert m >= 3 + assert m <= 2**8 + assert k >= 2 + assert k < m + assert pad >= 0 + assert pad < k + + assert sh >= 0 + assert sh < m + + bitsused = 0 + val = 0 + + val |= (m - 3) + bitsused += 8 # the first 8 bits always encode m + + kbits = log_ceil(m-2, 2) # num bits needed to store all possible values of k + val <<= kbits + bitsused += kbits + + val |= (k - 2) + + padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad + val <<= padbits + bitsused += padbits + + val |= pad + + shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum + val <<= shnumbits + bitsused += shnumbits + + val |= sh + + assert bitsused >= 11 + assert bitsused <= 32 + + if bitsused <= 16: + val <<= (16-bitsused) + cs = struct.pack('>H', val) + assert cs[:-2] == '\x00' * (len(cs)-2) + return cs[-2:] + if bitsused <= 24: + val <<= (24-bitsused) + cs = struct.pack('>I', val) + assert cs[:-3] == '\x00' * (len(cs)-3) + return cs[-3:] + else: + val <<= (32-bitsused) + cs = struct.pack('>I', val) + assert cs[:-4] == '\x00' * (len(cs)-4) + return cs[-4:] + +def MASK(bits): + return (1<> b2_bits_left) + 2 + + shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum + padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad + + val = byte & (~kbitmask) + + needed_padbits = padbits - b2_bits_left + if needed_padbits > 0: + byte = struct.unpack(">B", inf.read(1))[0] + val <<= 8 + val |= byte + needed_padbits -= 8 + assert needed_padbits <= 0 + extrabits = -needed_padbits + pad = val >> extrabits + val &= MASK(extrabits) + + needed_shbits = shbits - extrabits + if needed_shbits > 0: + byte = struct.unpack(">B", inf.read(1))[0] + val <<= 8 + val |= byte + needed_shbits -= 8 + assert needed_shbits <= 0 + + gotshbits = -needed_shbits + + sh = val >> gotshbits + + return (m, k, pad, sh,) + +FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s" +RE_FORMAT = "%s.[0-9]+_[0-9]+%s" +def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", verbose=False): """ - Encode inf, writing the shares to named $prefix+$sharenum. + Encode inf, writing the shares to specially named, newly created files. + + @param fsize: calling read() on inf must yield fsize bytes of data and + then raise an EOFError + @param dirname: the name of the directory into which the sharefiles will + be written """ - l = [ open(prefix+str(sharenum), "wb") for sharenum in range(m) ] - def cb(blocks, length): - assert len(blocks) == len(l) - for i in range(len(blocks)): - l[i].write(blocks[i]) - - encode_file(inf, cb, k, m, chunksize=4096) - -def decode_from_files(outf, filesize, prefix, k, m): + mlen = len(str(m)) + format = FORMAT_FORMAT % (mlen, mlen,) + + padbytes = fec.util.mathutil.pad_size(fsize, k) + + fns = [] + fs = [] + try: + for shnum in range(m): + hdr = _build_header(m, k, padbytes, shnum) + + fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,)) + if verbose: + print "Creating share file %r..." % (fn,) + fd = os.open(fn, os.O_WRONLY|os.O_CREAT|os.O_EXCL) + f = os.fdopen(fd, "wb") + f.write(hdr) + fs.append(f) + fns.append(fn) + sumlen = [0] + def cb(blocks, length): + if verbose: + print "Writing %d bytes into share files..." % (length,) + assert len(blocks) == len(fs) + sumlen[0] += length + if sumlen[0] > fsize: + raise IOError("Wrong file size -- possibly the size of the file changed during encoding. Original size: %d, observed size at least: %s" % (fsize, sumlen[0],)) + for i in range(len(blocks)): + data = blocks[i] + fs[i].write(data) + length -= len(data) + + encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096) + except EnvironmentError, le: + print "Cannot complete because of exception: " + print le + print "Cleaning up..." + # clean up + while fs: + f = fs.pop() + f.close() ; del f + fn = fns.pop() + if verbose: + print "Cleaning up: trying to remove %r..." % (fn,) + fileutil.remove_if_possible(fn) + return 1 + if verbose: + print "Done!" + return 0 + +def decode_from_files(outf, dirname, prefix, suffix=".fec", verbose=False): """ - Decode from the first k files in the current directory whose names begin - with prefix, writing the results to outf. + Decode from the first k files in the directory whose names match the + pattern, writing the results to outf. """ - import os + RE=re.compile(RE_FORMAT % (prefix, suffix,)) + infs = [] - sharenums = [] - listd = os.listdir(".") - random.shuffle(listd) - for f in listd: - if f.startswith(prefix): - infs.append(open(f, "rb")) - sharenums.append(int(f[len(prefix):])) + shnums = [] + m = None + k = None + padlen = None + + for fn in os.listdir(dirname): + if RE.match(fn): + f = open(os.path.join(dirname, fn), "rb") + + (nm, nk, npadlen, shnum,) = _parse_header(f) + if not (m is None or m == nm): + raise fec.Error("Share files were corrupted -- share file %s said that m was %s but another share file previously said that m was %s" % (f, nm, m,)) + m = nm + if not (k is None or k == nk): + raise fec.Error("Share files were corrupted -- share file %s said that k was %s but another share file previously said that k was %s" % (f, nk, k,)) + k = nk + if not (padlen is None or padlen == npadlen): + raise fec.Error("Share files were corrupted -- share file %s said that pad length was %s but another share file previously said that pad length was %s" % (f, npadlen, padlen,)) + padlen = npadlen + + infs.append(f) + shnums.append(shnum) + if len(infs) == k: break - CHUNKSIZE = 4096 - dec = fec.Decoder(k, m) + dec = easyfec.Decoder(k, m) + while True: - x = [ inf.read(CHUNKSIZE) for inf in infs ] - decblocks = dec.decode(x, sharenums) - for decblock in decblocks: - if len(decblock) == 0: - raise "error -- probably share was too short -- was it stored in a file which got truncated? chunksizes: %s" % ([len(chunk) for chunk in x],) - if filesize >= len(decblock): - outf.write(decblock) - filesize -= len(decblock) - # print "filesize is now %s after subtracting %s" % (filesize, len(decblock),) - else: - outf.write(decblock[:filesize]) - return + chunks = [ inf.read(CHUNKSIZE) for inf in infs ] + if [ch for ch in chunks if len(ch) != len(chunks[-1])]: + raise fec.Error("Share files were corrupted -- all share files are required to be the same length, but they weren't.") + + if len(chunks[-1]) == CHUNKSIZE: + # Then this was a full read, so we're still in the sharefiles. + resultdata = dec.decode(chunks, shnums, padlen=0) + outf.write(resultdata) + else: + # Then this was a short read, so we've reached the end of the sharefiles. + resultdata = dec.decode(chunks, shnums, padlen) + outf.write(resultdata) + return # Done. def encode_file(inf, cb, k, m, chunksize=4096): """ @@ -128,30 +288,31 @@ def encode_file(inf, cb, k, m, chunksize=4096): enc = fec.Encoder(k, m) l = tuple([ array.array('c') for i in range(k) ]) indatasize = k*chunksize # will be reset to shorter upon EOF + eof = False ZEROES=array.array('c', ['\x00'])*chunksize - while indatasize == k*chunksize: + while not eof: # This loop body executes once per segment. i = 0 while (i= 3: @@ -42,124 +56,165 @@ def ab(x): # debuggery return "%s:%s" % (len(x), "--empty--",) def _h(k, m, ss): - # sys.stdout.write("k: %s, m: %s, len(ss): %r, len(ss[0]): %r" % (k, m, len(ss), len(ss[0]),)) ; sys.stdout.flush() encer = fec.Encoder(k, m) - # sys.stdout.write("constructed.\n") ; sys.stdout.flush() nums_and_blocks = list(enumerate(encer.encode(ss))) - # sys.stdout.write("encoded.\n") ; sys.stdout.flush() assert isinstance(nums_and_blocks, list), nums_and_blocks assert len(nums_and_blocks) == m, (len(nums_and_blocks), m,) nums_and_blocks = random.sample(nums_and_blocks, k) blocks = [ x[1] for x in nums_and_blocks ] nums = [ x[0] for x in nums_and_blocks ] - # sys.stdout.write("about to construct Decoder.\n") ; sys.stdout.flush() decer = fec.Decoder(k, m) - # sys.stdout.write("about to decode from %s.\n"%nums) ; sys.stdout.flush() decoded = decer.decode(blocks, nums) - # sys.stdout.write("decoded.\n") ; sys.stdout.flush() assert len(decoded) == len(ss), (len(decoded), len(ss),) assert tuple([str(s) for s in decoded]) == tuple([str(s) for s in ss]), (tuple([ab(str(s)) for s in decoded]), tuple([ab(str(s)) for s in ss]),) def randstr(n): return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n))) -def div_ceil(n, d): - """ - The smallest integer k such that k*d >= n. - """ - return (n/d) + (n%d != 0) - -def next_multiple(n, k): - """ - The smallest multiple of k which is >= n. - """ - return div_ceil(n, k) * k - -def pad_size(n, k): - """ - The smallest number that has to be added to n so that n is a multiple of k. - """ - if n%k: - return k - n%k - else: - return 0 - -def _test_random(): +def _help_test_random(): m = random.randrange(1, 257) k = random.randrange(1, m+1) - l = random.randrange(0, 2**15) + l = random.randrange(0, 2**10) ss = [ randstr(l/k) for x in range(k) ] _h(k, m, ss) -def _test_random_with_l(l): +def _help_test_random_with_l(l): m = 83 k = 19 ss = [ randstr(l/k) for x in range(k) ] _h(k, m, ss) -def test_random(noisy=True): - for i in range(2**5): - # sys.stdout.write(",") - _test_random() - # sys.stdout.write(".") - if noisy: - print "%d randomized tests pass." % (i+1) - -def test_bad_args_enc(): - encer = fec.Encoder(2, 4) - try: - encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",]) - except fec.Error, e: - assert "Precondition violation: second argument is required to contain int" in str(e), e - else: - raise "Should have gotten fec.Error for wrong type of second argument." - - try: - encer.encode(["a", "b", ], 98) # not a sequence at all - except TypeError, e: - assert "Second argument (optional) was not a sequence" in str(e), e - else: - raise "Should have gotten TypeError for wrong type of second argument." - -def test_bad_args_dec(): - decer = fec.Decoder(2, 4) - - try: - decer.decode(98, [0, 1]) # first argument is not a sequence - except TypeError, e: - assert "First argument was not a sequence" in str(e), e - else: - raise "Should have gotten TypeError for wrong type of second argument." - - try: - decer.decode(["a", "b", ], ["c", "d",]) - except fec.Error, e: - assert "Precondition violation: second argument is required to contain int" in str(e), e - else: - raise "Should have gotten fec.Error for wrong type of second argument." - - try: - decer.decode(["a", "b", ], 98) # not a sequence at all - except TypeError, e: - assert "Second argument was not a sequence" in str(e), e - else: - raise "Should have gotten TypeError for wrong type of second argument." - -try: - from twisted.trial import unittest - class TestPyFec(unittest.TestCase): - def test_random(self): - test_random(False) - def test_bad_args_enc(self): - test_bad_args_enc() - def test_bad_args_dec(self): - test_bad_args_dec() -except ImportError: - # trial is unavailable, oh well - pass +class Fec(unittest.TestCase): + def test_random(self): + for i in range(3): + _help_test_random() + if VERBOSE: + print "%d randomized tests pass." % (i+1) + + def test_bad_args_enc(self): + encer = fec.Encoder(2, 4) + try: + encer.encode(["a", "b", ], ["c", "I am not an integer blocknum",]) + except fec.Error, e: + assert "Precondition violation: second argument is required to contain int" in str(e), e + else: + raise "Should have gotten fec.Error for wrong type of second argument." + + try: + encer.encode(["a", "b", ], 98) # not a sequence at all + except TypeError, e: + assert "Second argument (optional) was not a sequence" in str(e), e + else: + raise "Should have gotten TypeError for wrong type of second argument." + + def test_bad_args_dec(self): + decer = fec.Decoder(2, 4) + + try: + decer.decode(98, [0, 1]) # first argument is not a sequence + except TypeError, e: + assert "First argument was not a sequence" in str(e), e + else: + raise "Should have gotten TypeError for wrong type of second argument." + + try: + decer.decode(["a", "b", ], ["c", "d",]) + except fec.Error, e: + assert "Precondition violation: second argument is required to contain int" in str(e), e + else: + raise "Should have gotten fec.Error for wrong type of second argument." + + try: + decer.decode(["a", "b", ], 98) # not a sequence at all + except TypeError, e: + assert "Second argument was not a sequence" in str(e), e + else: + raise "Should have gotten TypeError for wrong type of second argument." + +class FileFec(unittest.TestCase): + def test_filefec_header(self): + for m in [3, 5, 7, 9, 11, 17, 19, 33, 35, 65, 66, 67, 129, 130, 131, 254, 255, 256,]: + for k in [2, 3, 5, 9, 17, 33, 65, 129, 255,]: + if k >= m: + continue + for pad in [0, 1, k-1,]: + if pad >= k: + continue + for sh in [0, 1, m-1,]: + if sh >= m: + continue + h = fec.filefec._build_header(m, k, pad, sh) + hio = cStringIO.StringIO(h) + (rm, rk, rpad, rsh,) = fec.filefec._parse_header(hio) + assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h + + def _help_test_filefec(self, teststr, k, m, numshs=None): + if numshs == None: + numshs = m + + TESTFNAME = "testfile.txt" + PREFIX = "test" + SUFFIX = ".fec" + + tempdir = fec.util.fileutil.NamedTemporaryDirectory(cleanup=False) + try: + tempfn = os.path.join(tempdir.name, TESTFNAME) + tempf = open(tempfn, 'wb') + tempf.write(teststr) + tempf.close() + fsize = os.path.getsize(tempfn) + assert fsize == len(teststr) + + # encode the file + fec.filefec.encode_to_files(open(tempfn, 'rb'), fsize, tempdir.name, PREFIX, k, m, SUFFIX, verbose=VERBOSE) + + # delete some share files + fns = os.listdir(tempdir.name) + RE=re.compile(fec.filefec.RE_FORMAT % (PREFIX, SUFFIX,)) + sharefs = [ fn for fn in fns if RE.match(fn) ] + random.shuffle(sharefs) + while len(sharefs) > numshs: + shfn = sharefs.pop() + fec.util.fileutil.remove(os.path.join(tempdir.name, shfn)) + + # decode from the share files + outf = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'wb') + fec.filefec.decode_from_files(outf, tempdir.name, PREFIX, SUFFIX, verbose=VERBOSE) + outf.close() + + tempfn = open(os.path.join(tempdir.name, 'recovered-testfile.txt'), 'rb') + recovereddata = tempfn.read() + assert recovereddata == teststr + finally: + tempdir.shutdown() + + def test_filefec_all_shares(self): + return self._help_test_filefec("Yellow Whirled!", 3, 8) + + def test_filefec_all_shares_with_padding(self, noisy=VERBOSE): + return self._help_test_filefec("Yellow Whirled!A", 3, 8) + + def test_filefec_min_shares_with_padding(self, noisy=VERBOSE): + return self._help_test_filefec("Yellow Whirled!A", 3, 8, numshs=3) if __name__ == "__main__": - test_bad_args_dec() - test_bad_args_enc() - test_random() - + if hasattr(unittest, 'main'): + unittest.main() + else: + sys.path.append(os.getcwd()) + mods = [] + fullname = os.path.realpath(os.path.abspath(__file__)) + for pathel in sys.path: + fullnameofpathel = os.path.realpath(os.path.abspath(pathel)) + if fullname.startswith(fullnameofpathel): + relname = fullname[len(fullnameofpathel):] + mod = (os.path.splitext(relname)[0]).replace(os.sep, '.').strip('.') + mods.append(mod) + + mods.sort(cmp=lambda x, y: cmp(len(x), len(y))) + mods.reverse() + for mod in mods: + cmdstr = "trial %s %s" % (' '.join(sys.argv[1:]), mod) + print cmdstr + if os.system(cmdstr) == 0: + break