2 from pyutil import fileutil
3 from pyutil.mathutil import pad_size, log_ceil
5 import array, os, struct
9 from base64 import b32encode
10 def ab(x): # debuggery
12 return "%s:%s" % (len(x), b32encode(x[-3:]),)
14 return "%s:%s" % (len(x), b32encode(x[-2:]),)
16 return "%s:%s" % (len(x), b32encode(x[-1:]),)
18 return "%s:%s" % (len(x), "--empty--",)
20 class InsufficientShareFilesError(zfec.Error):
21 def __init__(self, k, kb, *args, **kwargs):
22 zfec.Error.__init__(self, *args, **kwargs)
27 return "Insufficient share files -- %d share files are required to recover this file, but only %d were given" % (self.k, self.kb,)
30 return self.__repr__()
32 class CorruptedShareFilesError(zfec.Error):
35 def _build_header(m, k, pad, sh):
37 @param m: the total number of shares; 1 <= m <= 256
38 @param k: the number of shares required to reconstruct; 1 <= k <= m
39 @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k
40 @param sh: the shnum of this share; 0 <= k < m
42 @return: a compressed string encoding m, k, pad, and sh
58 bitsused += 8 # the first 8 bits always encode m
60 kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
66 padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
72 shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
78 assert bitsused >= 8, bitsused
79 assert bitsused <= 32, bitsused
83 cs = struct.pack('>H', val)
84 assert cs[:-2] == '\x00' * (len(cs)-2)
88 cs = struct.pack('>I', val)
89 assert cs[:-3] == '\x00' * (len(cs)-3)
93 cs = struct.pack('>I', val)
94 assert cs[:-4] == '\x00' * (len(cs)-4)
100 def _parse_header(inf):
102 @param inf: an object which I can call read(1) on to get another byte
104 @return: tuple of (m, k, pad, sh,); side-effect: the first one to four
105 bytes of inf will be read
107 # The first 8 bits always encode m.
110 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
114 # The next few bits encode k.
115 kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
116 b2_bits_left = 8-kbits
117 kbitmask = MASK(kbits) << b2_bits_left
120 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
122 k = ((byte & kbitmask) >> b2_bits_left) + 1
124 shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
125 padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
127 val = byte & (~kbitmask)
129 needed_padbits = padbits - b2_bits_left
130 if needed_padbits > 0:
133 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
134 byte = struct.unpack(">B", ch)[0]
138 assert needed_padbits <= 0
139 extrabits = -needed_padbits
140 pad = val >> extrabits
141 val &= MASK(extrabits)
143 needed_shbits = shbits - extrabits
144 if needed_shbits > 0:
147 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
148 byte = struct.unpack(">B", ch)[0]
152 assert needed_shbits <= 0
154 gotshbits = -needed_shbits
156 sh = val >> gotshbits
158 return (m, k, pad, sh,)
160 FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s"
161 RE_FORMAT = "%s.[0-9]+_[0-9]+%s"
162 def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=False, verbose=False):
164 Encode inf, writing the shares to specially named, newly created files.
166 @param fsize: calling read() on inf must yield fsize bytes of data and
167 then raise an EOFError
168 @param dirname: the name of the directory into which the sharefiles will
172 format = FORMAT_FORMAT % (mlen, mlen,)
174 padbytes = pad_size(fsize, k)
179 for shnum in range(m):
180 hdr = _build_header(m, k, padbytes, shnum)
182 fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,))
184 print "Creating share file %r..." % (fn,)
188 flags = os.O_WRONLY|os.O_CREAT|os.O_EXCL | (hasattr(os, 'O_BINARY') and os.O_BINARY)
189 fd = os.open(fn, flags)
190 f = os.fdopen(fd, "wb")
195 def cb(blocks, length):
196 assert len(blocks) == len(fs)
197 oldsumlen = sumlen[0]
200 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
201 print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
203 if sumlen[0] > fsize:
204 raise IOError("Wrong file size -- possibly the size of the file changed during encoding. Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
205 for i in range(len(blocks)):
210 encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
211 except EnvironmentError, le:
212 print "Cannot complete because of exception: "
214 print "Cleaning up..."
221 print "Cleaning up: trying to remove %r..." % (fn,)
222 fileutil.remove_if_possible(fn)
229 # Note: if you really prefer base-2 and you change this code, then please
230 # denote 2^20 as "MiB" instead of "MB" in order to avoid ambiguity. See:
231 # http://en.wikipedia.org/wiki/Megabyte
235 def decode_from_files(outf, infiles, verbose=False):
237 Decode from the first k files in infiles, writing the results to outf.
239 assert len(infiles) >= 2
248 (nm, nk, npadlen, shnum,) = _parse_header(f)
249 if not (m is None or m == nm):
250 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that m was %s but another share file previously said that m was %s" % (f.name, nm, m,))
252 if not (k is None or k == nk):
253 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that k was %s but another share file previously said that k was %s" % (f.name, nk, k,))
255 raise InsufficientShareFilesError(k, len(infiles))
257 if not (padlen is None or padlen == npadlen):
258 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that pad length was %s but another share file previously said that pad length was %s" % (f.name, npadlen, padlen,))
267 dec = easyfec.Decoder(k, m)
270 chunks = [ inf.read(CHUNKSIZE) for inf in infs ]
271 if [ch for ch in chunks if len(ch) != len(chunks[-1])]:
272 raise CorruptedShareFilesError("Share files were corrupted -- all share files are required to be the same length, but they weren't.")
274 if len(chunks[-1]) == CHUNKSIZE:
275 # Then this was a full read, so we're still in the sharefiles.
276 resultdata = dec.decode(chunks, shnums, padlen=0)
277 outf.write(resultdata)
278 byteswritten += len(resultdata)
280 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
281 print str(byteswritten / MILLION_BYTES) + " MB ...",
283 # Then this was a short read, so we've reached the end of the sharefiles.
284 resultdata = dec.decode(chunks, shnums, padlen)
285 outf.write(resultdata)
291 def encode_file(inf, cb, k, m, chunksize=4096):
293 Read in the contents of inf, encode, and call cb with the results.
295 First, k "input blocks" will be read from inf, each input block being of
296 size chunksize. Then these k blocks will be encoded into m "result
297 blocks". Then cb will be invoked, passing a list of the m result blocks
298 as its first argument, and the length of the encoded data as its second
299 argument. (The length of the encoded data is always equal to k*chunksize,
300 until the last iteration, when the end of the file has been reached and
301 less than k*chunksize bytes could be read from the file.) This procedure
302 is iterated until the end of the file is reached, in which case the space
303 of the input blocks that is unused is filled with zeroes before encoding.
305 Note that the sequence passed in calls to cb() contains mutable array
306 objects in its first k elements whose contents will be overwritten when
307 the next segment is read from the input file. Therefore the
308 implementation of cb() has to either be finished with those first k arrays
309 before returning, or if it wants to keep the contents of those arrays for
310 subsequent use after it has returned then it must make a copy of them to
313 @param inf the file object from which to read the data
314 @param cb the callback to be invoked with the results
315 @param k the number of shares required to reconstruct the file
316 @param m the total number of shares created
317 @param chunksize how much data to read from inf for each of the k input
320 enc = zfec.Encoder(k, m)
321 l = tuple([ array.array('c') for i in range(k) ])
322 indatasize = k*chunksize # will be reset to shorter upon EOF
324 ZEROES=array.array('c', ['\x00'])*chunksize
326 # This loop body executes once per segment.
329 # This loop body executes once per chunk.
333 a.fromfile(inf, chunksize)
337 indatasize = i*chunksize + len(a)
340 a.fromstring("\x00" * (chunksize-len(a)))
351 from hashlib import sha1
353 # hashlib was added in Python 2.5.0.
356 def encode_file_not_really(inf, cb, k, m, chunksize=4096):
357 enc = zfec.Encoder(k, m)
358 l = tuple([ array.array('c') for i in range(k) ])
359 indatasize = k*chunksize # will be reset to shorter upon EOF
361 ZEROES=array.array('c', ['\x00'])*chunksize
363 # This loop body executes once per segment.
366 # This loop body executes once per chunk.
370 a.fromfile(inf, chunksize)
374 indatasize = i*chunksize + len(a)
377 a.fromstring("\x00" * (chunksize-len(a)))
384 # res = enc.encode(l)
387 def encode_file_not_really_and_hash(inf, cb, k, m, chunksize=4096):
389 enc = zfec.Encoder(k, m)
390 l = tuple([ array.array('c') for i in range(k) ])
391 indatasize = k*chunksize # will be reset to shorter upon EOF
393 ZEROES=array.array('c', ['\x00'])*chunksize
395 # This loop body executes once per segment.
398 # This loop body executes once per chunk.
402 a.fromfile(inf, chunksize)
406 indatasize = i*chunksize + len(a)
409 a.fromstring("\x00" * (chunksize-len(a)))
416 # res = enc.encode(l)
421 def encode_file_stringy(inf, cb, k, m, chunksize=4096):
423 Read in the contents of inf, encode, and call cb with the results.
425 First, k "input blocks" will be read from inf, each input block being of
426 size chunksize. Then these k blocks will be encoded into m "result
427 blocks". Then cb will be invoked, passing a list of the m result blocks
428 as its first argument, and the length of the encoded data as its second
429 argument. (The length of the encoded data is always equal to k*chunksize,
430 until the last iteration, when the end of the file has been reached and
431 less than k*chunksize bytes could be read from the file.) This procedure
432 is iterated until the end of the file is reached, in which case the part
433 of the input shares that is unused is filled with zeroes before encoding.
435 @param inf the file object from which to read the data
436 @param cb the callback to be invoked with the results
437 @param k the number of shares required to reconstruct the file
438 @param m the total number of shares created
439 @param chunksize how much data to read from inf for each of the k input
442 enc = zfec.Encoder(k, m)
443 indatasize = k*chunksize # will be reset to shorter upon EOF
444 while indatasize == k*chunksize:
445 # This loop body executes once per segment.
448 ZEROES = '\x00'*chunksize
450 # This loop body executes once per chunk.
452 l.append(inf.read(chunksize))
453 if len(l[-1]) < chunksize:
454 indatasize = i*chunksize + len(l[-1])
457 l[-1] = l[-1] + "\x00" * (chunksize-len(l[-1]))
465 def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
467 Read in the contents of inf, encode, and call cb with the results.
469 First, chunksize*k bytes will be read from inf, then encoded into m
470 "result blocks". Then cb will be invoked, passing a list of the m result
471 blocks as its first argument, and the length of the encoded data as its
472 second argument. (The length of the encoded data is always equal to
473 k*chunksize, until the last iteration, when the end of the file has been
474 reached and less than k*chunksize bytes could be read from the file.)
475 This procedure is iterated until the end of the file is reached, in which
476 case the space of the input that is unused is filled with zeroes before
479 @param inf the file object from which to read the data
480 @param cb the callback to be invoked with the results
481 @param k the number of shares required to reconstruct the file
482 @param m the total number of shares created
483 @param chunksize how much data to read from inf for each of the k input
486 enc = easyfec.Encoder(k, m)
488 readsize = k*chunksize
489 indata = inf.read(readsize)
491 res = enc.encode(indata)
493 indata = inf.read(readsize)
495 # zfec -- fast forward error correction library with Python interface
497 # Copyright (C) 2007 Allmydata, Inc.
498 # Author: Zooko Wilcox-O'Hearn
500 # This file is part of zfec.
502 # See README.txt for licensing information.