if args.requiredshares == args.totalshares:
print "warning: silly parameters: requiredshares == totalshares, which means that all shares will be required in order to reconstruct the file. You could use \"split\" for the same effect. But proceeding to do it anyway..."
- args.inputfile.seek(0, 2)
- fsize = args.inputfile.tell()
- args.inputfile.seek(0, 0)
+ if args.inputfile.name == '<stdin>':
+ fsize = None
+ else:
+ args.inputfile.seek(0, 2)
+ fsize = args.inputfile.tell()
+ args.inputfile.seek(0, 0)
return filefec.encode_to_files(args.inputfile, fsize, args.output_dir, args.prefix, args.requiredshares, args.totalshares, args.suffix, args.force, args.verbose)
# zfec -- fast forward error correction library with Python interface
from pyutil import fileutil
from pyutil.mathutil import pad_size, log_ceil
-import array, os, struct
+import array, os, struct, sys
CHUNKSIZE = 4096
Encode inf, writing the shares to specially named, newly created files.
@param fsize: calling read() on inf must yield fsize bytes of data and
- then raise an EOFError
+ then raise an EOFError. If None, the file is being read as a stream
+ and the size is not known in advance, header must be updated after
+ the reading is complete.
@param dirname: the name of the directory into which the sharefiles will
be written
"""
mlen = len(str(m))
format = FORMAT_FORMAT % (mlen, mlen,)
- padbytes = pad_size(fsize, k)
+ if fsize is None:
+ # Streaming case, file size is unknown.
+ # We'll recompute the pad at the end and write it
+ padbytes = pad_size(0, k)
+ else:
+ padbytes = pad_size(fsize, k)
fns = []
fs = []
oldsumlen = sumlen[0]
sumlen[0] += length
if verbose:
- if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
- print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
-
- if sumlen[0] > fsize:
+ if fsize is None:
+ # 30MB takes a short while to write on a normal drive.
+ interval = MILLION_BYTES * 30
+ if int((float(oldsumlen) / interval)) != int((float(sumlen[0]) / interval)):
+ print str(int((float(sumlen[0]) / MILLION_BYTES))) + "MB ...",
+ else:
+ if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
+ print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
+ sys.stdout.flush()
+
+ if fsize is not None and sumlen[0] > fsize:
raise IOError("Wrong file size -- possibly the size of the file changed during encoding. Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
- for i in range(len(blocks)):
- data = blocks[i]
- fs[i].write(data)
- length -= len(data)
+ if length != 0:
+ for i in range(len(blocks)):
+ data = blocks[i]
+ fs[i].write(data)
+ length -= len(data)
+ # EOF signal, if no fsize, update pad
+ elif fsize is None:
+ padbytes = pad_size(oldsumlen, k)
+ for shnum in range(len(fs)):
+ hdr = _build_header(m, k, padbytes, shnum)
+ fs[shnum].seek(0)
+ fs[shnum].write(hdr)
encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
except EnvironmentError, le:
# Thanks.
MILLION_BYTES=10**6
-def decode_from_files(outf, infiles, verbose=False):
+def decode_from_files(outf, infiles, verbose=False, is_stream=False):
"""
Decode from the first k files in infiles, writing the results to outf.
"""
outf.write(resultdata)
byteswritten += len(resultdata)
if verbose:
- if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
- print str(byteswritten / MILLION_BYTES) + " MB ...",
+ if ((byteswritten - len(resultdata)) / (30*MILLION_BYTES)) != (byteswritten / (30*MILLION_BYTES)):
+ message = str(byteswritten / MILLION_BYTES) + "MB ..."
+ outpipe = is_stream and sys.stderr or sys.stdout
+ print >> outpipe, message,
+ outpipe.flush()
else:
# Then this was a short read, so we've reached the end of the sharefiles.
resultdata = dec.decode(chunks, shnums, padlen)
outf.write(resultdata)
- return # Done.
+ break # Done.
if verbose:
- print
- print "Done!"
+ outpipe = is_stream and sys.stderr or sys.stdout
+ print >> outpipe
+ print >> outpipe, "Done!"
def encode_file(inf, cb, k, m, chunksize=4096):
"""
res = enc.encode(indata)
cb(res, len(indata))
indata = inf.read(readsize)
+ # Final callback to update pad length in header for streaming case.
+ cb([''] * m, 0)
# zfec -- fast forward error correction library with Python interface
#
assert (rm, rk, rpad, rsh,) == (m, k, pad, sh,), h
def _help_test_filefec(self, teststr, k, m, numshs=None):
+ self._do_help_test_filefec(teststr, k, m, numshs=numshs, withsize=False)
+ self._do_help_test_filefec(teststr, k, m, numshs=numshs, withsize=True)
+
+ def _do_help_test_filefec(self, teststr, k, m, numshs=None, withsize=True):
if numshs == None:
numshs = m
PREFIX = "test"
SUFFIX = ".fec"
- fsize = len(teststr)
+ if withsize:
+ fsize = len(teststr)
+ else:
+ fsize = None
tempdir = fileutil.NamedTemporaryDirectory(cleanup=True)
try: