2 from pyutil import fileutil
3 from pyutil.mathutil import pad_size, log_ceil
5 import array, os, re, struct, traceback
9 from base64 import b32encode
10 def ab(x): # debuggery
12 return "%s:%s" % (len(x), b32encode(x[-3:]),)
14 return "%s:%s" % (len(x), b32encode(x[-2:]),)
16 return "%s:%s" % (len(x), b32encode(x[-1:]),)
18 return "%s:%s" % (len(x), "--empty--",)
20 class InsufficientShareFilesError(zfec.Error):
21 def __init__(self, k, kb, *args, **kwargs):
22 zfec.Error.__init__(self, *args, **kwargs)
27 return "Insufficient share files -- %d share files are required to recover this file, but only %d were given" % (self.k, self.kb,)
30 return self.__repr__()
32 class CorruptedShareFilesError(zfec.Error):
35 def _build_header(m, k, pad, sh):
37 @param m: the total number of shares; 1 <= m <= 256
38 @param k: the number of shares required to reconstruct; 1 <= k <= m
39 @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k
40 @param sh: the shnum of this share; 0 <= k < m
42 @return: a compressed string encoding m, k, pad, and sh
58 bitsused += 8 # the first 8 bits always encode m
60 kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
66 padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
72 shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
78 assert bitsused >= 8, bitsused
79 assert bitsused <= 32, bitsused
83 cs = struct.pack('>H', val)
84 assert cs[:-2] == '\x00' * (len(cs)-2)
88 cs = struct.pack('>I', val)
89 assert cs[:-3] == '\x00' * (len(cs)-3)
93 cs = struct.pack('>I', val)
94 assert cs[:-4] == '\x00' * (len(cs)-4)
100 def _parse_header(inf):
102 @param inf: an object which I can call read(1) on to get another byte
104 @return: tuple of (m, k, pad, sh,); side-effect: the first one to four
105 bytes of inf will be read
107 # The first 8 bits always encode m.
110 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
114 # The next few bits encode k.
115 kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
116 b2_bits_left = 8-kbits
117 kbitmask = MASK(kbits) << b2_bits_left
120 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
122 k = ((byte & kbitmask) >> b2_bits_left) + 1
124 shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
125 padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
127 val = byte & (~kbitmask)
129 needed_padbits = padbits - b2_bits_left
130 if needed_padbits > 0:
133 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
134 byte = struct.unpack(">B", ch)[0]
138 assert needed_padbits <= 0
139 extrabits = -needed_padbits
140 pad = val >> extrabits
141 val &= MASK(extrabits)
143 needed_shbits = shbits - extrabits
144 if needed_shbits > 0:
147 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
148 byte = struct.unpack(">B", ch)[0]
152 assert needed_shbits <= 0
154 gotshbits = -needed_shbits
156 sh = val >> gotshbits
158 return (m, k, pad, sh,)
160 FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s"
161 RE_FORMAT = "%s.[0-9]+_[0-9]+%s"
162 def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=False, verbose=False):
164 Encode inf, writing the shares to specially named, newly created files.
166 @param fsize: calling read() on inf must yield fsize bytes of data and
167 then raise an EOFError
168 @param dirname: the name of the directory into which the sharefiles will
172 format = FORMAT_FORMAT % (mlen, mlen,)
174 padbytes = pad_size(fsize, k)
179 for shnum in range(m):
180 hdr = _build_header(m, k, padbytes, shnum)
182 fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,))
184 print "Creating share file %r..." % (fn,)
188 flags = os.O_WRONLY|os.O_CREAT|os.O_EXCL | (hasattr(os, 'O_BINARY') and os.O_BINARY)
189 fd = os.open(fn, flags)
190 f = os.fdopen(fd, "wb")
195 def cb(blocks, length):
196 assert len(blocks) == len(fs)
197 oldsumlen = sumlen[0]
200 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
201 print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
203 if sumlen[0] > fsize:
204 raise IOError("Wrong file size -- possibly the size of the file changed during encoding. Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
205 for i in range(len(blocks)):
210 encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
211 except EnvironmentError, le:
212 print "Cannot complete because of exception: "
214 print "Cleaning up..."
221 print "Cleaning up: trying to remove %r..." % (fn,)
222 fileutil.remove_if_possible(fn)
229 # Note: if you really prefer base-2 and you change this code, then please
230 # denote 2^20 as "MiB" instead of "MB" in order to avoid ambiguity. See:
231 # http://en.wikipedia.org/wiki/Megabyte
235 def decode_from_files(outf, infiles, verbose=False):
237 Decode from the first k files in infiles, writing the results to outf.
239 assert len(infiles) >= 2
248 (nm, nk, npadlen, shnum,) = _parse_header(f)
249 if not (m is None or m == nm):
250 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that m was %s but another share file previously said that m was %s" % (f.name, nm, m,))
252 if not (k is None or k == nk):
253 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that k was %s but another share file previously said that k was %s" % (f.name, nk, k,))
255 raise InsufficientShareFilesError(k, len(infiles))
257 if not (padlen is None or padlen == npadlen):
258 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that pad length was %s but another share file previously said that pad length was %s" % (f.name, npadlen, padlen,))
267 dec = easyfec.Decoder(k, m)
270 chunks = [ inf.read(CHUNKSIZE) for inf in infs ]
271 if [ch for ch in chunks if len(ch) != len(chunks[-1])]:
272 raise CorruptedShareFilesError("Share files were corrupted -- all share files are required to be the same length, but they weren't.")
274 if len(chunks[-1]) == CHUNKSIZE:
275 # Then this was a full read, so we're still in the sharefiles.
276 resultdata = dec.decode(chunks, shnums, padlen=0)
277 outf.write(resultdata)
278 byteswritten += len(resultdata)
280 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
281 print str(byteswritten / MILLION_BYTES) + " MB ...",
283 # Then this was a short read, so we've reached the end of the sharefiles.
284 resultdata = dec.decode(chunks, shnums, padlen)
285 outf.write(resultdata)
291 def encode_file(inf, cb, k, m, chunksize=4096):
293 Read in the contents of inf, encode, and call cb with the results.
295 First, k "input blocks" will be read from inf, each input block being of
296 size chunksize. Then these k blocks will be encoded into m "result
297 blocks". Then cb will be invoked, passing a list of the m result blocks
298 as its first argument, and the length of the encoded data as its second
299 argument. (The length of the encoded data is always equal to k*chunksize,
300 until the last iteration, when the end of the file has been reached and
301 less than k*chunksize bytes could be read from the file.) This procedure
302 is iterated until the end of the file is reached, in which case the space
303 of the input blocks that is unused is filled with zeroes before encoding.
305 Note that the sequence passed in calls to cb() contains mutable array
306 objects in its first k elements whose contents will be overwritten when
307 the next segment is read from the input file. Therefore the
308 implementation of cb() has to either be finished with those first k arrays
309 before returning, or if it wants to keep the contents of those arrays for
310 subsequent use after it has returned then it must make a copy of them to
313 @param inf the file object from which to read the data
314 @param cb the callback to be invoked with the results
315 @param k the number of shares required to reconstruct the file
316 @param m the total number of shares created
317 @param chunksize how much data to read from inf for each of the k input
320 enc = zfec.Encoder(k, m)
321 l = tuple([ array.array('c') for i in range(k) ])
322 indatasize = k*chunksize # will be reset to shorter upon EOF
324 ZEROES=array.array('c', ['\x00'])*chunksize
326 # This loop body executes once per segment.
329 # This loop body executes once per chunk.
333 a.fromfile(inf, chunksize)
337 indatasize = i*chunksize + len(a)
340 a.fromstring("\x00" * (chunksize-len(a)))
351 def encode_file_not_really(inf, cb, k, m, chunksize=4096):
352 enc = zfec.Encoder(k, m)
353 l = tuple([ array.array('c') for i in range(k) ])
354 indatasize = k*chunksize # will be reset to shorter upon EOF
356 ZEROES=array.array('c', ['\x00'])*chunksize
358 # This loop body executes once per segment.
361 # This loop body executes once per chunk.
365 a.fromfile(inf, chunksize)
369 indatasize = i*chunksize + len(a)
372 a.fromstring("\x00" * (chunksize-len(a)))
379 # res = enc.encode(l)
382 def encode_file_not_really_and_hash(inf, cb, k, m, chunksize=4096):
384 enc = zfec.Encoder(k, m)
385 l = tuple([ array.array('c') for i in range(k) ])
386 indatasize = k*chunksize # will be reset to shorter upon EOF
388 ZEROES=array.array('c', ['\x00'])*chunksize
390 # This loop body executes once per segment.
393 # This loop body executes once per chunk.
397 a.fromfile(inf, chunksize)
401 indatasize = i*chunksize + len(a)
404 a.fromstring("\x00" * (chunksize-len(a)))
411 # res = enc.encode(l)
416 def encode_file_stringy(inf, cb, k, m, chunksize=4096):
418 Read in the contents of inf, encode, and call cb with the results.
420 First, k "input blocks" will be read from inf, each input block being of
421 size chunksize. Then these k blocks will be encoded into m "result
422 blocks". Then cb will be invoked, passing a list of the m result blocks
423 as its first argument, and the length of the encoded data as its second
424 argument. (The length of the encoded data is always equal to k*chunksize,
425 until the last iteration, when the end of the file has been reached and
426 less than k*chunksize bytes could be read from the file.) This procedure
427 is iterated until the end of the file is reached, in which case the part
428 of the input shares that is unused is filled with zeroes before encoding.
430 @param inf the file object from which to read the data
431 @param cb the callback to be invoked with the results
432 @param k the number of shares required to reconstruct the file
433 @param m the total number of shares created
434 @param chunksize how much data to read from inf for each of the k input
437 enc = zfec.Encoder(k, m)
438 indatasize = k*chunksize # will be reset to shorter upon EOF
439 while indatasize == k*chunksize:
440 # This loop body executes once per segment.
443 ZEROES = '\x00'*chunksize
445 # This loop body executes once per chunk.
447 l.append(inf.read(chunksize))
448 if len(l[-1]) < chunksize:
449 indatasize = i*chunksize + len(l[-1])
452 l[-1] = l[-1] + "\x00" * (chunksize-len(l[-1]))
460 def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
462 Read in the contents of inf, encode, and call cb with the results.
464 First, chunksize*k bytes will be read from inf, then encoded into m
465 "result blocks". Then cb will be invoked, passing a list of the m result
466 blocks as its first argument, and the length of the encoded data as its
467 second argument. (The length of the encoded data is always equal to
468 k*chunksize, until the last iteration, when the end of the file has been
469 reached and less than k*chunksize bytes could be read from the file.)
470 This procedure is iterated until the end of the file is reached, in which
471 case the space of the input that is unused is filled with zeroes before
474 @param inf the file object from which to read the data
475 @param cb the callback to be invoked with the results
476 @param k the number of shares required to reconstruct the file
477 @param m the total number of shares created
478 @param chunksize how much data to read from inf for each of the k input
481 enc = easyfec.Encoder(k, m)
483 readsize = k*chunksize
484 indata = inf.read(readsize)
486 res = enc.encode(indata)
488 indata = inf.read(readsize)
490 # zfec -- fast forward error correction library with Python interface
492 # Copyright (C) 2007 Allmydata, Inc.
493 # Author: Zooko Wilcox-O'Hearn
495 # This file is part of zfec.
497 # See README.txt for licensing information.