2 from pyutil import fileutil
3 from pyutil.mathutil import pad_size, log_ceil
5 import array, os, struct
9 from base64 import b32encode
10 def ab(x): # debuggery
12 return "%s:%s" % (len(x), b32encode(x[-3:]),)
14 return "%s:%s" % (len(x), b32encode(x[-2:]),)
16 return "%s:%s" % (len(x), b32encode(x[-1:]),)
18 return "%s:%s" % (len(x), "--empty--",)
20 class InsufficientShareFilesError(zfec.Error):
21 def __init__(self, k, kb, *args, **kwargs):
22 zfec.Error.__init__(self, *args, **kwargs)
27 return "Insufficient share files -- %d share files are required to recover this file, but only %d were given" % (self.k, self.kb,)
30 return self.__repr__()
32 class CorruptedShareFilesError(zfec.Error):
35 def _build_header(m, k, pad, sh):
37 @param m: the total number of shares; 1 <= m <= 256
38 @param k: the number of shares required to reconstruct; 1 <= k <= m
39 @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k
40 @param sh: the shnum of this share; 0 <= k < m
42 @return: a compressed string encoding m, k, pad, and sh
58 bitsused += 8 # the first 8 bits always encode m
60 kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
66 padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
72 shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
78 assert bitsused >= 8, bitsused
79 assert bitsused <= 32, bitsused
83 cs = struct.pack('>H', val)
84 assert cs[:-2] == '\x00' * (len(cs)-2)
88 cs = struct.pack('>I', val)
89 assert cs[:-3] == '\x00' * (len(cs)-3)
93 cs = struct.pack('>I', val)
94 assert cs[:-4] == '\x00' * (len(cs)-4)
100 def _parse_header(inf):
102 @param inf: an object which I can call read(1) on to get another byte
104 @return: tuple of (m, k, pad, sh,); side-effect: the first one to four
105 bytes of inf will be read
107 # The first 8 bits always encode m.
110 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
114 # The next few bits encode k.
115 kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
116 b2_bits_left = 8-kbits
117 kbitmask = MASK(kbits) << b2_bits_left
120 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
122 k = ((byte & kbitmask) >> b2_bits_left) + 1
124 shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
125 padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
127 val = byte & (~kbitmask)
129 needed_padbits = padbits - b2_bits_left
130 if needed_padbits > 0:
133 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
134 byte = struct.unpack(">B", ch)[0]
138 assert needed_padbits <= 0
139 extrabits = -needed_padbits
140 pad = val >> extrabits
141 val &= MASK(extrabits)
143 needed_shbits = shbits - extrabits
144 if needed_shbits > 0:
147 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
148 byte = struct.unpack(">B", ch)[0]
152 assert needed_shbits <= 0
154 gotshbits = -needed_shbits
156 sh = val >> gotshbits
158 return (m, k, pad, sh,)
160 FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s"
161 RE_FORMAT = "%s.[0-9]+_[0-9]+%s"
162 def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=False, verbose=False):
164 Encode inf, writing the shares to specially named, newly created files.
166 @param fsize: calling read() on inf must yield fsize bytes of data and
167 then raise an EOFError
168 @param dirname: the name of the directory into which the sharefiles will
172 format = FORMAT_FORMAT % (mlen, mlen,)
174 padbytes = pad_size(fsize, k)
179 for shnum in range(m):
180 hdr = _build_header(m, k, padbytes, shnum)
182 fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,))
184 print "Creating share file %r..." % (fn,)
188 flags = os.O_WRONLY|os.O_CREAT|os.O_EXCL | (hasattr(os, 'O_BINARY') and os.O_BINARY)
189 fd = os.open(fn, flags)
190 f = os.fdopen(fd, "wb")
195 def cb(blocks, length):
196 assert len(blocks) == len(fs)
197 oldsumlen = sumlen[0]
200 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
201 print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
203 if sumlen[0] > fsize:
204 raise IOError("Wrong file size -- possibly the size of the file changed during encoding. Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
205 for i in range(len(blocks)):
210 encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
211 except EnvironmentError, le:
212 print "Cannot complete because of exception: "
214 print "Cleaning up..."
221 print "Cleaning up: trying to remove %r..." % (fn,)
222 fileutil.remove_if_possible(fn)
229 # Note: if you really prefer base-2 and you change this code, then please
230 # denote 2^20 as "MiB" instead of "MB" in order to avoid ambiguity. See:
231 # http://en.wikipedia.org/wiki/Megabyte
235 def decode_from_files(outf, infiles, verbose=False):
237 Decode from the first k files in infiles, writing the results to outf.
239 assert len(infiles) >= 2
248 (nm, nk, npadlen, shnum,) = _parse_header(f)
249 if not (m is None or m == nm):
250 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that m was %s but another share file previously said that m was %s" % (f.name, nm, m,))
252 if not (k is None or k == nk):
253 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that k was %s but another share file previously said that k was %s" % (f.name, nk, k,))
255 raise InsufficientShareFilesError(k, len(infiles))
257 if not (padlen is None or padlen == npadlen):
258 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that pad length was %s but another share file previously said that pad length was %s" % (f.name, npadlen, padlen,))
267 dec = easyfec.Decoder(k, m)
270 chunks = [ inf.read(CHUNKSIZE) for inf in infs ]
271 if [ch for ch in chunks if len(ch) != len(chunks[-1])]:
272 raise CorruptedShareFilesError("Share files were corrupted -- all share files are required to be the same length, but they weren't.")
274 if len(chunks[-1]) == CHUNKSIZE:
275 # Then this was a full read, so we're still in the sharefiles.
276 resultdata = dec.decode(chunks, shnums, padlen=0)
277 outf.write(resultdata)
278 byteswritten += len(resultdata)
280 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
281 print str(byteswritten / MILLION_BYTES) + " MB ...",
283 # Then this was a short read, so we've reached the end of the sharefiles.
284 resultdata = dec.decode(chunks, shnums, padlen)
285 outf.write(resultdata)
291 def encode_file(inf, cb, k, m, chunksize=4096):
293 Read in the contents of inf, encode, and call cb with the results.
295 First, k "input blocks" will be read from inf, each input block being of
296 size chunksize. Then these k blocks will be encoded into m "result
297 blocks". Then cb will be invoked, passing a list of the m result blocks
298 as its first argument, and the length of the encoded data as its second
299 argument. (The length of the encoded data is always equal to k*chunksize,
300 until the last iteration, when the end of the file has been reached and
301 less than k*chunksize bytes could be read from the file.) This procedure
302 is iterated until the end of the file is reached, in which case the space
303 of the input blocks that is unused is filled with zeroes before encoding.
305 Note that the sequence passed in calls to cb() contains mutable array
306 objects in its first k elements whose contents will be overwritten when
307 the next segment is read from the input file. Therefore the
308 implementation of cb() has to either be finished with those first k arrays
309 before returning, or if it wants to keep the contents of those arrays for
310 subsequent use after it has returned then it must make a copy of them to
313 @param inf the file object from which to read the data
314 @param cb the callback to be invoked with the results
315 @param k the number of shares required to reconstruct the file
316 @param m the total number of shares created
317 @param chunksize how much data to read from inf for each of the k input
320 enc = zfec.Encoder(k, m)
321 l = tuple([ array.array('c') for i in range(k) ])
322 indatasize = k*chunksize # will be reset to shorter upon EOF
324 ZEROES=array.array('c', ['\x00'])*chunksize
326 # This loop body executes once per segment.
329 # This loop body executes once per chunk.
333 a.fromfile(inf, chunksize)
337 indatasize = i*chunksize + len(a)
340 a.fromstring("\x00" * (chunksize-len(a)))
351 from hashlib import sha1
352 sha1 = sha1 # hush pyflakes
354 # hashlib was added in Python 2.5.0.
358 def encode_file_not_really(inf, cb, k, m, chunksize=4096):
359 enc = zfec.Encoder(k, m)
360 l = tuple([ array.array('c') for i in range(k) ])
361 indatasize = k*chunksize # will be reset to shorter upon EOF
363 ZEROES=array.array('c', ['\x00'])*chunksize
365 # This loop body executes once per segment.
368 # This loop body executes once per chunk.
372 a.fromfile(inf, chunksize)
376 indatasize = i*chunksize + len(a)
379 a.fromstring("\x00" * (chunksize-len(a)))
386 # res = enc.encode(l)
389 def encode_file_not_really_and_hash(inf, cb, k, m, chunksize=4096):
391 enc = zfec.Encoder(k, m)
392 l = tuple([ array.array('c') for i in range(k) ])
393 indatasize = k*chunksize # will be reset to shorter upon EOF
395 ZEROES=array.array('c', ['\x00'])*chunksize
397 # This loop body executes once per segment.
400 # This loop body executes once per chunk.
404 a.fromfile(inf, chunksize)
408 indatasize = i*chunksize + len(a)
411 a.fromstring("\x00" * (chunksize-len(a)))
418 # res = enc.encode(l)
423 def encode_file_stringy(inf, cb, k, m, chunksize=4096):
425 Read in the contents of inf, encode, and call cb with the results.
427 First, k "input blocks" will be read from inf, each input block being of
428 size chunksize. Then these k blocks will be encoded into m "result
429 blocks". Then cb will be invoked, passing a list of the m result blocks
430 as its first argument, and the length of the encoded data as its second
431 argument. (The length of the encoded data is always equal to k*chunksize,
432 until the last iteration, when the end of the file has been reached and
433 less than k*chunksize bytes could be read from the file.) This procedure
434 is iterated until the end of the file is reached, in which case the part
435 of the input shares that is unused is filled with zeroes before encoding.
437 @param inf the file object from which to read the data
438 @param cb the callback to be invoked with the results
439 @param k the number of shares required to reconstruct the file
440 @param m the total number of shares created
441 @param chunksize how much data to read from inf for each of the k input
444 enc = zfec.Encoder(k, m)
445 indatasize = k*chunksize # will be reset to shorter upon EOF
446 while indatasize == k*chunksize:
447 # This loop body executes once per segment.
450 ZEROES = '\x00'*chunksize
452 # This loop body executes once per chunk.
454 l.append(inf.read(chunksize))
455 if len(l[-1]) < chunksize:
456 indatasize = i*chunksize + len(l[-1])
459 l[-1] = l[-1] + "\x00" * (chunksize-len(l[-1]))
467 def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
469 Read in the contents of inf, encode, and call cb with the results.
471 First, chunksize*k bytes will be read from inf, then encoded into m
472 "result blocks". Then cb will be invoked, passing a list of the m result
473 blocks as its first argument, and the length of the encoded data as its
474 second argument. (The length of the encoded data is always equal to
475 k*chunksize, until the last iteration, when the end of the file has been
476 reached and less than k*chunksize bytes could be read from the file.)
477 This procedure is iterated until the end of the file is reached, in which
478 case the space of the input that is unused is filled with zeroes before
481 @param inf the file object from which to read the data
482 @param cb the callback to be invoked with the results
483 @param k the number of shares required to reconstruct the file
484 @param m the total number of shares created
485 @param chunksize how much data to read from inf for each of the k input
488 enc = easyfec.Encoder(k, m)
490 readsize = k*chunksize
491 indata = inf.read(readsize)
493 res = enc.encode(indata)
495 indata = inf.read(readsize)
497 # zfec -- fast forward error correction library with Python interface
499 # Copyright (C) 2007-2010 Allmydata, Inc.
500 # Author: Zooko Wilcox-O'Hearn
502 # This file is part of zfec.
504 # See README.rst for licensing information.