From: Ramakrishnan Muthukrishnan Date: Sun, 24 Jan 2016 07:03:26 +0000 (+0530) Subject: zfec/zunfec: make the cmdline programs take input from stdin X-Git-Url: https://git.rkrishnan.org/FOOURL?a=commitdiff_plain;h=refs%2Fheads%2F14-stdin-streaming.0;p=tahoe-lafs%2Fzfec.git zfec/zunfec: make the cmdline programs take input from stdin Patch from Eric Pollmann: https://tahoe-lafs.org/trac/zfec/ticket/14 --- diff --git a/zfec/cmdline_zfec.py b/zfec/cmdline_zfec.py index 2eee2b0..e56dc5b 100755 --- a/zfec/cmdline_zfec.py +++ b/zfec/cmdline_zfec.py @@ -57,9 +57,12 @@ def main(): if args.requiredshares == args.totalshares: print "warning: silly parameters: requiredshares == totalshares, which means that all shares will be required in order to reconstruct the file. You could use \"split\" for the same effect. But proceeding to do it anyway..." - args.inputfile.seek(0, 2) - fsize = args.inputfile.tell() - args.inputfile.seek(0, 0) + if args.inputfile.name == '': + fsize = None + else: + args.inputfile.seek(0, 2) + fsize = args.inputfile.tell() + args.inputfile.seek(0, 0) return filefec.encode_to_files(args.inputfile, fsize, args.output_dir, args.prefix, args.requiredshares, args.totalshares, args.suffix, args.force, args.verbose) # zfec -- fast forward error correction library with Python interface diff --git a/zfec/cmdline_zunfec.py b/zfec/cmdline_zunfec.py index 8fc0c04..514b468 100755 --- a/zfec/cmdline_zunfec.py +++ b/zfec/cmdline_zunfec.py @@ -30,7 +30,9 @@ def main(): print "At least two sharefiles are required." return 1 - if args.force: + if args.outputfile == '-': + outf = sys.stdout + elif args.force: outf = open(args.outputfile, 'wb') else: try: @@ -49,7 +51,7 @@ def main(): for fn in args.sharefiles: sharefs.append(open(fn, 'rb')) try: - ret = filefec.decode_from_files(outf, sharefs, args.verbose) + ret = filefec.decode_from_files(outf, sharefs, args.verbose, sys.stdout==outf) except filefec.InsufficientShareFilesError, e: print str(e) return 3 diff --git a/zfec/filefec.py b/zfec/filefec.py index 48f3d9d..942096e 100644 --- a/zfec/filefec.py +++ b/zfec/filefec.py @@ -2,7 +2,7 @@ import easyfec, zfec from pyutil import fileutil from pyutil.mathutil import pad_size, log_ceil -import array, os, struct +import array, os, struct, sys CHUNKSIZE = 4096 @@ -164,14 +164,21 @@ def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite= Encode inf, writing the shares to specially named, newly created files. @param fsize: calling read() on inf must yield fsize bytes of data and - then raise an EOFError + then raise an EOFError. If None, the file is being read as a stream + and the size is not known in advance, header must be updated after + the reading is complete. @param dirname: the name of the directory into which the sharefiles will be written """ mlen = len(str(m)) format = FORMAT_FORMAT % (mlen, mlen,) - padbytes = pad_size(fsize, k) + if fsize is None: + # Streaming case, file size is unknown. + # We'll recompute the pad at the end and write it + padbytes = pad_size(0, k) + else: + padbytes = pad_size(fsize, k) fns = [] fs = [] @@ -197,15 +204,30 @@ def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite= oldsumlen = sumlen[0] sumlen[0] += length if verbose: - if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10): - print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...", - - if sumlen[0] > fsize: + if fsize is None: + # 30MB takes a short while to write on a normal drive. + interval = MILLION_BYTES * 30 + if int((float(oldsumlen) / interval)) != int((float(sumlen[0]) / interval)): + print str(int((float(sumlen[0]) / MILLION_BYTES))) + "MB ...", + else: + if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10): + print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...", + sys.stdout.flush() + + if fsize is not None and sumlen[0] > fsize: raise IOError("Wrong file size -- possibly the size of the file changed during encoding. Original size: %d, observed size at least: %s" % (fsize, sumlen[0],)) - for i in range(len(blocks)): - data = blocks[i] - fs[i].write(data) - length -= len(data) + if length != 0: + for i in range(len(blocks)): + data = blocks[i] + fs[i].write(data) + length -= len(data) + # EOF signal, if no fsize, update pad + elif fsize is None: + padbytes = pad_size(oldsumlen, k) + for shnum in range(len(fs)): + hdr = _build_header(m, k, padbytes, shnum) + fs[shnum].seek(0) + fs[shnum].write(hdr) encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096) except EnvironmentError, le: @@ -232,7 +254,7 @@ def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite= # Thanks. MILLION_BYTES=10**6 -def decode_from_files(outf, infiles, verbose=False): +def decode_from_files(outf, infiles, verbose=False, is_stream=False): """ Decode from the first k files in infiles, writing the results to outf. """ @@ -277,16 +299,20 @@ def decode_from_files(outf, infiles, verbose=False): outf.write(resultdata) byteswritten += len(resultdata) if verbose: - if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)): - print str(byteswritten / MILLION_BYTES) + " MB ...", + if ((byteswritten - len(resultdata)) / (30*MILLION_BYTES)) != (byteswritten / (30*MILLION_BYTES)): + message = str(byteswritten / MILLION_BYTES) + "MB ..." + outpipe = is_stream and sys.stderr or sys.stdout + print >> outpipe, message, + outpipe.flush() else: # Then this was a short read, so we've reached the end of the sharefiles. resultdata = dec.decode(chunks, shnums, padlen) outf.write(resultdata) - return # Done. + break # Done. if verbose: - print - print "Done!" + outpipe = is_stream and sys.stderr or sys.stdout + print >> outpipe + print >> outpipe, "Done!" def encode_file(inf, cb, k, m, chunksize=4096): """ @@ -493,6 +519,8 @@ def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096): res = enc.encode(indata) cb(res, len(indata)) indata = inf.read(readsize) + # Final callback to update pad length in header for streaming case. + cb([''] * m, 0) # zfec -- fast forward error correction library with Python interface # diff --git a/zfec/test/test_zfec.py b/zfec/test/test_zfec.py index 46bbb1c..b2d9a5b 100755 --- a/zfec/test/test_zfec.py +++ b/zfec/test/test_zfec.py @@ -246,6 +246,10 @@ class FileFec(unittest.TestCase): assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h def _help_test_filefec(self, teststr, k, m, numshs=None): + self._do_help_test_filefec(teststr, k, m, numshs=numshs, withsize=False) + self._do_help_test_filefec(teststr, k, m, numshs=numshs, withsize=True) + + def _do_help_test_filefec(self, teststr, k, m, numshs=None, withsize=True): if numshs == None: numshs = m @@ -253,7 +257,10 @@ class FileFec(unittest.TestCase): PREFIX = "test" SUFFIX = ".fec" - fsize = len(teststr) + if withsize: + fsize = len(teststr) + else: + fsize = None tempdir = fileutil.NamedTemporaryDirectory(cleanup=True) try: