From: Ramakrishnan Muthukrishnan <ram@rkrishnan.org>
Date: Sun, 24 Jan 2016 07:03:26 +0000 (+0530)
Subject: zfec/zunfec: make the cmdline programs take input from stdin
X-Git-Url: https://git.rkrishnan.org/frontends//%22%3C?a=commitdiff_plain;h=refs%2Fheads%2F14-stdin-streaming.0;p=tahoe-lafs%2Fzfec.git

zfec/zunfec: make the cmdline programs take input from stdin

Patch from Eric Pollmann: https://tahoe-lafs.org/trac/zfec/ticket/14
---

diff --git a/zfec/cmdline_zfec.py b/zfec/cmdline_zfec.py
index 2eee2b0..e56dc5b 100755
--- a/zfec/cmdline_zfec.py
+++ b/zfec/cmdline_zfec.py
@@ -57,9 +57,12 @@ def main():
         if args.requiredshares == args.totalshares:
             print "warning: silly parameters: requiredshares == totalshares, which means that all shares will be required in order to reconstruct the file.  You could use \"split\" for the same effect.  But proceeding to do it anyway..."
 
-    args.inputfile.seek(0, 2)
-    fsize = args.inputfile.tell()
-    args.inputfile.seek(0, 0)
+    if args.inputfile.name == '<stdin>':
+      fsize = None
+    else:
+      args.inputfile.seek(0, 2)
+      fsize = args.inputfile.tell()
+      args.inputfile.seek(0, 0)
     return filefec.encode_to_files(args.inputfile, fsize, args.output_dir, args.prefix, args.requiredshares, args.totalshares, args.suffix, args.force, args.verbose)
 
 # zfec -- fast forward error correction library with Python interface
diff --git a/zfec/cmdline_zunfec.py b/zfec/cmdline_zunfec.py
index 8fc0c04..514b468 100755
--- a/zfec/cmdline_zunfec.py
+++ b/zfec/cmdline_zunfec.py
@@ -30,7 +30,9 @@ def main():
         print "At least two sharefiles are required."
         return 1
 
-    if args.force:
+    if args.outputfile == '-':
+        outf = sys.stdout
+    elif args.force:
         outf = open(args.outputfile, 'wb')
     else:
         try:
@@ -49,7 +51,7 @@ def main():
     for fn in args.sharefiles:
         sharefs.append(open(fn, 'rb'))
     try:
-        ret = filefec.decode_from_files(outf, sharefs, args.verbose)
+        ret = filefec.decode_from_files(outf, sharefs, args.verbose, sys.stdout==outf)
     except filefec.InsufficientShareFilesError, e:
         print str(e)
         return 3
diff --git a/zfec/filefec.py b/zfec/filefec.py
index 48f3d9d..942096e 100644
--- a/zfec/filefec.py
+++ b/zfec/filefec.py
@@ -2,7 +2,7 @@ import easyfec, zfec
 from pyutil import fileutil
 from pyutil.mathutil import pad_size, log_ceil
 
-import array, os, struct
+import array, os, struct, sys
 
 CHUNKSIZE = 4096
 
@@ -164,14 +164,21 @@ def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=
     Encode inf, writing the shares to specially named, newly created files.
 
     @param fsize: calling read() on inf must yield fsize bytes of data and
-        then raise an EOFError
+        then raise an EOFError.  If None, the file is being read as a stream
+        and the size is not known in advance, header must be updated after
+        the reading is complete.
     @param dirname: the name of the directory into which the sharefiles will
         be written
     """
     mlen = len(str(m))
     format = FORMAT_FORMAT % (mlen, mlen,)
 
-    padbytes = pad_size(fsize, k)
+    if fsize is None:
+        # Streaming case, file size is unknown.
+        # We'll recompute the pad at the end and write it
+        padbytes = pad_size(0, k)
+    else:
+        padbytes = pad_size(fsize, k)
 
     fns = []
     fs = []
@@ -197,15 +204,30 @@ def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=
             oldsumlen = sumlen[0]
             sumlen[0] += length
             if verbose:
-                if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
-                    print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
-
-            if sumlen[0] > fsize:
+                if fsize is None:
+                    # 30MB takes a short while to write on a normal drive.
+                    interval = MILLION_BYTES * 30
+                    if int((float(oldsumlen) / interval)) != int((float(sumlen[0]) / interval)):
+                        print str(int((float(sumlen[0]) / MILLION_BYTES))) + "MB ...",
+                else:
+                    if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
+                        print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
+                sys.stdout.flush()
+
+            if fsize is not None and sumlen[0] > fsize:
                 raise IOError("Wrong file size -- possibly the size of the file changed during encoding.  Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
-            for i in range(len(blocks)):
-                data = blocks[i]
-                fs[i].write(data)
-                length -= len(data)
+            if length != 0:
+                for i in range(len(blocks)):
+                    data = blocks[i]
+                    fs[i].write(data)
+                    length -= len(data)
+            # EOF signal, if no fsize, update pad
+            elif fsize is None:
+                padbytes = pad_size(oldsumlen, k)
+                for shnum in range(len(fs)):
+                    hdr = _build_header(m, k, padbytes, shnum)
+                    fs[shnum].seek(0)
+                    fs[shnum].write(hdr)
 
         encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
     except EnvironmentError, le:
@@ -232,7 +254,7 @@ def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=
 # Thanks.
 MILLION_BYTES=10**6
 
-def decode_from_files(outf, infiles, verbose=False):
+def decode_from_files(outf, infiles, verbose=False, is_stream=False):
     """
     Decode from the first k files in infiles, writing the results to outf.
     """
@@ -277,16 +299,20 @@ def decode_from_files(outf, infiles, verbose=False):
             outf.write(resultdata)
             byteswritten += len(resultdata)
             if verbose:
-                if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
-                    print str(byteswritten / MILLION_BYTES) + " MB ...",
+                if ((byteswritten - len(resultdata)) / (30*MILLION_BYTES)) != (byteswritten / (30*MILLION_BYTES)):
+                    message = str(byteswritten / MILLION_BYTES) + "MB ..."
+                    outpipe = is_stream and sys.stderr or sys.stdout
+                    print >> outpipe, message,
+                    outpipe.flush()
         else:
             # Then this was a short read, so we've reached the end of the sharefiles.
             resultdata = dec.decode(chunks, shnums, padlen)
             outf.write(resultdata)
-            return # Done.
+            break # Done.
     if verbose:
-        print
-        print "Done!"
+        outpipe = is_stream and sys.stderr or sys.stdout
+        print >> outpipe
+        print >> outpipe, "Done!"
 
 def encode_file(inf, cb, k, m, chunksize=4096):
     """
@@ -493,6 +519,8 @@ def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
         res = enc.encode(indata)
         cb(res, len(indata))
         indata = inf.read(readsize)
+    # Final callback to update pad length in header for streaming case.
+    cb([''] * m, 0)
 
 # zfec -- fast forward error correction library with Python interface
 #
diff --git a/zfec/test/test_zfec.py b/zfec/test/test_zfec.py
index 46bbb1c..b2d9a5b 100755
--- a/zfec/test/test_zfec.py
+++ b/zfec/test/test_zfec.py
@@ -246,6 +246,10 @@ class FileFec(unittest.TestCase):
                         assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
 
     def _help_test_filefec(self, teststr, k, m, numshs=None):
+        self._do_help_test_filefec(teststr, k, m, numshs=numshs, withsize=False)
+        self._do_help_test_filefec(teststr, k, m, numshs=numshs, withsize=True)
+
+    def _do_help_test_filefec(self, teststr, k, m, numshs=None, withsize=True):
         if numshs == None:
             numshs = m
 
@@ -253,7 +257,10 @@ class FileFec(unittest.TestCase):
         PREFIX = "test"
         SUFFIX = ".fec"
 
-        fsize = len(teststr)
+        if withsize:
+          fsize = len(teststr)
+        else:
+          fsize = None
 
         tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
         try: