2 from util import fileutil
3 from util.mathutil import log_ceil
5 import array, os, re, struct, traceback
9 class InsufficientShareFilesError(zfec.Error):
10 def __init__(self, k, kb, *args, **kwargs):
11 zfec.Error.__init__(self, *args, **kwargs)
16 return "Insufficient share files -- %d share files are required to recover this file, but only %d were given" % (self.k, self.kb,)
19 return self.__repr__()
21 class CorruptedShareFilesError(zfec.Error):
24 def _build_header(m, k, pad, sh):
26 @param m: the total number of shares; 1 <= m <= 256
27 @param k: the number of shares required to reconstruct; 1 <= k <= m
28 @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k
29 @param sh: the shnum of this share; 0 <= k < m
31 @return: a compressed string encoding m, k, pad, and sh
47 bitsused += 8 # the first 8 bits always encode m
49 kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
55 padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
61 shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
67 assert bitsused >= 8, bitsused
68 assert bitsused <= 32, bitsused
72 cs = struct.pack('>H', val)
73 assert cs[:-2] == '\x00' * (len(cs)-2)
77 cs = struct.pack('>I', val)
78 assert cs[:-3] == '\x00' * (len(cs)-3)
82 cs = struct.pack('>I', val)
83 assert cs[:-4] == '\x00' * (len(cs)-4)
89 def _parse_header(inf):
91 @param inf: an object which I can call read(1) on to get another byte
93 @return: tuple of (m, k, pad, sh,); side-effect: the first one to four
94 bytes of inf will be read
96 # The first 8 bits always encode m.
99 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
103 # The next few bits encode k.
104 kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
105 b2_bits_left = 8-kbits
106 kbitmask = MASK(kbits) << b2_bits_left
109 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
111 k = ((byte & kbitmask) >> b2_bits_left) + 1
113 shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
114 padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
116 val = byte & (~kbitmask)
118 needed_padbits = padbits - b2_bits_left
119 if needed_padbits > 0:
122 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
123 byte = struct.unpack(">B", ch)[0]
127 assert needed_padbits <= 0
128 extrabits = -needed_padbits
129 pad = val >> extrabits
130 val &= MASK(extrabits)
132 needed_shbits = shbits - extrabits
133 if needed_shbits > 0:
136 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
137 byte = struct.unpack(">B", ch)[0]
141 assert needed_shbits <= 0
143 gotshbits = -needed_shbits
145 sh = val >> gotshbits
147 return (m, k, pad, sh,)
149 FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s"
150 RE_FORMAT = "%s.[0-9]+_[0-9]+%s"
151 def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=False, verbose=False):
153 Encode inf, writing the shares to specially named, newly created files.
155 @param fsize: calling read() on inf must yield fsize bytes of data and
156 then raise an EOFError
157 @param dirname: the name of the directory into which the sharefiles will
161 format = FORMAT_FORMAT % (mlen, mlen,)
163 padbytes = zfec.util.mathutil.pad_size(fsize, k)
168 for shnum in range(m):
169 hdr = _build_header(m, k, padbytes, shnum)
171 fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,))
173 print "Creating share file %r..." % (fn,)
177 flags = os.O_WRONLY|os.O_CREAT|os.O_EXCL | (hasattr(os, 'O_BINARY') and os.O_BINARY)
178 fd = os.open(fn, flags)
179 f = os.fdopen(fd, "wb")
184 def cb(blocks, length):
185 assert len(blocks) == len(fs)
186 oldsumlen = sumlen[0]
189 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
190 print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
192 if sumlen[0] > fsize:
193 raise IOError("Wrong file size -- possibly the size of the file changed during encoding. Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
194 for i in range(len(blocks)):
199 encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
200 except EnvironmentError, le:
201 print "Cannot complete because of exception: "
203 print "Cleaning up..."
210 print "Cleaning up: trying to remove %r..." % (fn,)
211 fileutil.remove_if_possible(fn)
218 # Note: if you really prefer base-2 and you change this code, then please
219 # denote 2^20 as "MiB" instead of "MB" in order to avoid ambiguity.
221 # http://en.wikipedia.org/wiki/Megabyte
224 def decode_from_files(outf, infiles, verbose=False):
226 Decode from the first k files in infiles, writing the results to outf.
228 assert len(infiles) >= 2
237 (nm, nk, npadlen, shnum,) = _parse_header(f)
238 if not (m is None or m == nm):
239 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that m was %s but another share file previously said that m was %s" % (f.name, nm, m,))
241 if not (k is None or k == nk):
242 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that k was %s but another share file previously said that k was %s" % (f.name, nk, k,))
244 raise InsufficientShareFilesError(k, len(infiles))
246 if not (padlen is None or padlen == npadlen):
247 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that pad length was %s but another share file previously said that pad length was %s" % (f.name, npadlen, padlen,))
256 dec = easyfec.Decoder(k, m)
259 chunks = [ inf.read(CHUNKSIZE) for inf in infs ]
260 if [ch for ch in chunks if len(ch) != len(chunks[-1])]:
261 raise CorruptedShareFilesError("Share files were corrupted -- all share files are required to be the same length, but they weren't.")
263 if len(chunks[-1]) == CHUNKSIZE:
264 # Then this was a full read, so we're still in the sharefiles.
265 resultdata = dec.decode(chunks, shnums, padlen=0)
266 outf.write(resultdata)
267 byteswritten += len(resultdata)
269 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
270 print str(byteswritten / MILLION_BYTES) + " MB ...",
272 # Then this was a short read, so we've reached the end of the sharefiles.
273 resultdata = dec.decode(chunks, shnums, padlen)
274 outf.write(resultdata)
280 def encode_file(inf, cb, k, m, chunksize=4096):
282 Read in the contents of inf, encode, and call cb with the results.
284 First, k "input blocks" will be read from inf, each input block being of
285 size chunksize. Then these k blocks will be encoded into m "result
286 blocks". Then cb will be invoked, passing a list of the m result blocks
287 as its first argument, and the length of the encoded data as its second
288 argument. (The length of the encoded data is always equal to k*chunksize,
289 until the last iteration, when the end of the file has been reached and
290 less than k*chunksize bytes could be read from the file.) This procedure
291 is iterated until the end of the file is reached, in which case the space
292 of the input blocks that is unused is filled with zeroes before encoding.
294 Note that the sequence passed in calls to cb() contains mutable array
295 objects in its first k elements whose contents will be overwritten when
296 the next segment is read from the input file. Therefore the
297 implementation of cb() has to either be finished with those first k arrays
298 before returning, or if it wants to keep the contents of those arrays for
299 subsequent use after it has returned then it must make a copy of them to
302 @param inf the file object from which to read the data
303 @param cb the callback to be invoked with the results
304 @param k the number of shares required to reconstruct the file
305 @param m the total number of shares created
306 @param chunksize how much data to read from inf for each of the k input
309 enc = zfec.Encoder(k, m)
310 l = tuple([ array.array('c') for i in range(k) ])
311 indatasize = k*chunksize # will be reset to shorter upon EOF
313 ZEROES=array.array('c', ['\x00'])*chunksize
315 # This loop body executes once per segment.
318 # This loop body executes once per chunk.
322 a.fromfile(inf, chunksize)
326 indatasize = i*chunksize + len(a)
329 a.fromstring("\x00" * (chunksize-len(a)))
340 def encode_file_not_really(inf, cb, k, m, chunksize=4096):
341 enc = zfec.Encoder(k, m)
342 l = tuple([ array.array('c') for i in range(k) ])
343 indatasize = k*chunksize # will be reset to shorter upon EOF
345 ZEROES=array.array('c', ['\x00'])*chunksize
347 # This loop body executes once per segment.
350 # This loop body executes once per chunk.
354 a.fromfile(inf, chunksize)
358 indatasize = i*chunksize + len(a)
361 a.fromstring("\x00" * (chunksize-len(a)))
368 # res = enc.encode(l)
371 def encode_file_not_really_and_hash(inf, cb, k, m, chunksize=4096):
373 enc = zfec.Encoder(k, m)
374 l = tuple([ array.array('c') for i in range(k) ])
375 indatasize = k*chunksize # will be reset to shorter upon EOF
377 ZEROES=array.array('c', ['\x00'])*chunksize
379 # This loop body executes once per segment.
382 # This loop body executes once per chunk.
386 a.fromfile(inf, chunksize)
390 indatasize = i*chunksize + len(a)
393 a.fromstring("\x00" * (chunksize-len(a)))
400 # res = enc.encode(l)
405 def encode_file_stringy(inf, cb, k, m, chunksize=4096):
407 Read in the contents of inf, encode, and call cb with the results.
409 First, k "input blocks" will be read from inf, each input block being of
410 size chunksize. Then these k blocks will be encoded into m "result
411 blocks". Then cb will be invoked, passing a list of the m result blocks
412 as its first argument, and the length of the encoded data as its second
413 argument. (The length of the encoded data is always equal to k*chunksize,
414 until the last iteration, when the end of the file has been reached and
415 less than k*chunksize bytes could be read from the file.) This procedure
416 is iterated until the end of the file is reached, in which case the part
417 of the input shares that is unused is filled with zeroes before encoding.
419 @param inf the file object from which to read the data
420 @param cb the callback to be invoked with the results
421 @param k the number of shares required to reconstruct the file
422 @param m the total number of shares created
423 @param chunksize how much data to read from inf for each of the k input
426 enc = zfec.Encoder(k, m)
427 indatasize = k*chunksize # will be reset to shorter upon EOF
428 while indatasize == k*chunksize:
429 # This loop body executes once per segment.
432 ZEROES = '\x00'*chunksize
434 # This loop body executes once per chunk.
436 l.append(inf.read(chunksize))
437 if len(l[-1]) < chunksize:
438 indatasize = i*chunksize + len(l[-1])
441 l[-1] = l[-1] + "\x00" * (chunksize-len(l[-1]))
449 def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
451 Read in the contents of inf, encode, and call cb with the results.
453 First, chunksize*k bytes will be read from inf, then encoded into m
454 "result blocks". Then cb will be invoked, passing a list of the m result
455 blocks as its first argument, and the length of the encoded data as its
456 second argument. (The length of the encoded data is always equal to
457 k*chunksize, until the last iteration, when the end of the file has been
458 reached and less than k*chunksize bytes could be read from the file.)
459 This procedure is iterated until the end of the file is reached, in which
460 case the space of the input that is unused is filled with zeroes before
463 @param inf the file object from which to read the data
464 @param cb the callback to be invoked with the results
465 @param k the number of shares required to reconstruct the file
466 @param m the total number of shares created
467 @param chunksize how much data to read from inf for each of the k input
470 enc = easyfec.Encoder(k, m)
472 readsize = k*chunksize
473 indata = inf.read(readsize)
475 res = enc.encode(indata)
477 indata = inf.read(readsize)
479 # zfec -- fast forward error correction library with Python interface
481 # Copyright (C) 2007 Allmydata, Inc.
482 # Author: Zooko Wilcox-O'Hearn
484 # This file is part of zfec.
486 # This program is free software; you can redistribute it and/or modify it
487 # under the terms of the GNU General Public License as published by the Free
488 # Software Foundation; either version 2 of the License, or (at your option)
489 # any later version, with the added permission that, if you become obligated
490 # to release a derived work under this licence (as per section 2.b), you may
491 # delay the fulfillment of this obligation for up to 12 months. See the file
492 # COPYING for details.
494 # If you would like to inquire about a commercial relationship with Allmydata,
495 # Inc., please contact partnerships@allmydata.com and visit
496 # http://allmydata.com/.