2 from util import fileutil
3 from util.mathutil import log_ceil
5 import array, os, re, struct, traceback
9 class InsufficientShareFilesError(zfec.Error):
10 def __init__(self, k, kb, *args, **kwargs):
11 zfec.Error.__init__(self, *args, **kwargs)
16 return "Insufficient share files -- %d share files are required to recover this file, but only %d were given" % (self.k, self.kb,)
19 return self.__repr__()
21 class CorruptedShareFilesError(zfec.Error):
24 def _build_header(m, k, pad, sh):
26 @param m: the total number of shares; 3 <= m <= 256
27 @param k: the number of shares required to reconstruct; 2 <= k < m
28 @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k
29 @param sh: the shnum of this share; 0 <= k < m
31 @return: a string (which is hopefully short) encoding m, k, sh, and pad
47 bitsused += 8 # the first 8 bits always encode m
49 kbits = log_ceil(m-2, 2) # num bits needed to store all possible values of k
55 padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
61 shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
72 cs = struct.pack('>H', val)
73 assert cs[:-2] == '\x00' * (len(cs)-2)
77 cs = struct.pack('>I', val)
78 assert cs[:-3] == '\x00' * (len(cs)-3)
82 cs = struct.pack('>I', val)
83 assert cs[:-4] == '\x00' * (len(cs)-4)
89 def _parse_header(inf):
91 @param inf: an object which I can call read(1) on to get another byte
93 @return: tuple of (m, k, pad, sh,); side-effect: the first one to four
94 bytes of inf will be read
96 # The first 8 bits always encode m.
99 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
103 # The next few bits encode k.
104 kbits = log_ceil(m-2, 2) # num bits needed to store all possible values of k
105 b2_bits_left = 8-kbits
106 kbitmask = MASK(kbits) << b2_bits_left
109 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
111 k = ((byte & kbitmask) >> b2_bits_left) + 2
113 shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
114 padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
116 val = byte & (~kbitmask)
118 needed_padbits = padbits - b2_bits_left
119 if needed_padbits > 0:
122 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
123 byte = struct.unpack(">B", ch)[0]
127 assert needed_padbits <= 0
128 extrabits = -needed_padbits
129 pad = val >> extrabits
130 val &= MASK(extrabits)
132 needed_shbits = shbits - extrabits
133 if needed_shbits > 0:
136 raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front. Perhaps the file was truncated." % (inf.name,))
137 byte = struct.unpack(">B", ch)[0]
141 assert needed_shbits <= 0
143 gotshbits = -needed_shbits
145 sh = val >> gotshbits
147 return (m, k, pad, sh,)
149 FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s"
150 RE_FORMAT = "%s.[0-9]+_[0-9]+%s"
151 def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=False, verbose=False):
153 Encode inf, writing the shares to specially named, newly created files.
155 @param fsize: calling read() on inf must yield fsize bytes of data and
156 then raise an EOFError
157 @param dirname: the name of the directory into which the sharefiles will
161 format = FORMAT_FORMAT % (mlen, mlen,)
163 padbytes = zfec.util.mathutil.pad_size(fsize, k)
168 for shnum in range(m):
169 hdr = _build_header(m, k, padbytes, shnum)
171 fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,))
173 print "Creating share file %r..." % (fn,)
177 fd = os.open(fn, os.O_WRONLY|os.O_CREAT|os.O_EXCL|os.O_BINARY)
178 f = os.fdopen(fd, "wb")
183 def cb(blocks, length):
184 assert len(blocks) == len(fs)
185 oldsumlen = sumlen[0]
188 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
189 print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
191 if sumlen[0] > fsize:
192 raise IOError("Wrong file size -- possibly the size of the file changed during encoding. Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
193 for i in range(len(blocks)):
198 encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
199 except EnvironmentError, le:
200 print "Cannot complete because of exception: "
202 print "Cleaning up..."
209 print "Cleaning up: trying to remove %r..." % (fn,)
210 fileutil.remove_if_possible(fn)
217 # Note: if you really prefer base-2 and you change this code, then please
218 # denote 2^20 as "MiB" instead of "MB" in order to avoid ambiguity.
220 # http://en.wikipedia.org/wiki/Megabyte
223 def decode_from_files(outf, infiles, verbose=False):
225 Decode from the first k files in infiles, writing the results to outf.
227 assert len(infiles) >= 2
236 (nm, nk, npadlen, shnum,) = _parse_header(f)
237 if not (m is None or m == nm):
238 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that m was %s but another share file previously said that m was %s" % (f.name, nm, m,))
240 if not (k is None or k == nk):
241 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that k was %s but another share file previously said that k was %s" % (f.name, nk, k,))
243 raise InsufficientShareFilesError(k, len(infiles))
245 if not (padlen is None or padlen == npadlen):
246 raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that pad length was %s but another share file previously said that pad length was %s" % (f.name, npadlen, padlen,))
255 dec = easyfec.Decoder(k, m)
258 chunks = [ inf.read(CHUNKSIZE) for inf in infs ]
259 if [ch for ch in chunks if len(ch) != len(chunks[-1])]:
260 raise CorruptedShareFilesError("Share files were corrupted -- all share files are required to be the same length, but they weren't.")
262 if len(chunks[-1]) == CHUNKSIZE:
263 # Then this was a full read, so we're still in the sharefiles.
264 resultdata = dec.decode(chunks, shnums, padlen=0)
265 outf.write(resultdata)
266 byteswritten += len(resultdata)
268 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
269 print str(byteswritten / MILLION_BYTES) + " MB ...",
271 # Then this was a short read, so we've reached the end of the sharefiles.
272 resultdata = dec.decode(chunks, shnums, padlen)
273 outf.write(resultdata)
279 def encode_file(inf, cb, k, m, chunksize=4096):
281 Read in the contents of inf, encode, and call cb with the results.
283 First, k "input blocks" will be read from inf, each input block being of
284 size chunksize. Then these k blocks will be encoded into m "result
285 blocks". Then cb will be invoked, passing a list of the m result blocks
286 as its first argument, and the length of the encoded data as its second
287 argument. (The length of the encoded data is always equal to k*chunksize,
288 until the last iteration, when the end of the file has been reached and
289 less than k*chunksize bytes could be read from the file.) This procedure
290 is iterated until the end of the file is reached, in which case the space
291 of the input blocks that is unused is filled with zeroes before encoding.
293 Note that the sequence passed in calls to cb() contains mutable array
294 objects in its first k elements whose contents will be overwritten when
295 the next segment is read from the input file. Therefore the
296 implementation of cb() has to either be finished with those first k arrays
297 before returning, or if it wants to keep the contents of those arrays for
298 subsequent use after it has returned then it must make a copy of them to
301 @param inf the file object from which to read the data
302 @param cb the callback to be invoked with the results
303 @param k the number of shares required to reconstruct the file
304 @param m the total number of shares created
305 @param chunksize how much data to read from inf for each of the k input
308 enc = zfec.Encoder(k, m)
309 l = tuple([ array.array('c') for i in range(k) ])
310 indatasize = k*chunksize # will be reset to shorter upon EOF
312 ZEROES=array.array('c', ['\x00'])*chunksize
314 # This loop body executes once per segment.
317 # This loop body executes once per chunk.
321 a.fromfile(inf, chunksize)
325 indatasize = i*chunksize + len(a)
328 a.fromstring("\x00" * (chunksize-len(a)))
338 def encode_file_stringy(inf, cb, k, m, chunksize=4096):
340 Read in the contents of inf, encode, and call cb with the results.
342 First, k "input blocks" will be read from inf, each input block being of
343 size chunksize. Then these k blocks will be encoded into m "result
344 blocks". Then cb will be invoked, passing a list of the m result blocks
345 as its first argument, and the length of the encoded data as its second
346 argument. (The length of the encoded data is always equal to k*chunksize,
347 until the last iteration, when the end of the file has been reached and
348 less than k*chunksize bytes could be read from the file.) This procedure
349 is iterated until the end of the file is reached, in which case the part
350 of the input shares that is unused is filled with zeroes before encoding.
352 @param inf the file object from which to read the data
353 @param cb the callback to be invoked with the results
354 @param k the number of shares required to reconstruct the file
355 @param m the total number of shares created
356 @param chunksize how much data to read from inf for each of the k input
359 enc = zfec.Encoder(k, m)
360 indatasize = k*chunksize # will be reset to shorter upon EOF
361 while indatasize == k*chunksize:
362 # This loop body executes once per segment.
365 ZEROES = '\x00'*chunksize
367 # This loop body executes once per chunk.
369 l.append(inf.read(chunksize))
370 if len(l[-1]) < chunksize:
371 indatasize = i*chunksize + len(l[-1])
374 l[-1] = l[-1] + "\x00" * (chunksize-len(l[-1]))
382 def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
384 Read in the contents of inf, encode, and call cb with the results.
386 First, chunksize*k bytes will be read from inf, then encoded into m
387 "result blocks". Then cb will be invoked, passing a list of the m result
388 blocks as its first argument, and the length of the encoded data as its
389 second argument. (The length of the encoded data is always equal to
390 k*chunksize, until the last iteration, when the end of the file has been
391 reached and less than k*chunksize bytes could be read from the file.)
392 This procedure is iterated until the end of the file is reached, in which
393 case the space of the input that is unused is filled with zeroes before
396 @param inf the file object from which to read the data
397 @param cb the callback to be invoked with the results
398 @param k the number of shares required to reconstruct the file
399 @param m the total number of shares created
400 @param chunksize how much data to read from inf for each of the k input
403 enc = easyfec.Encoder(k, m)
405 readsize = k*chunksize
406 indata = inf.read(readsize)
408 res = enc.encode(indata)
410 indata = inf.read(readsize)
412 # zfec -- fast forward error correction library with Python interface
414 # Copyright (C) 2007 Allmydata, Inc.
415 # Author: Zooko Wilcox-O'Hearn
416 # mailto:zooko@zooko.com
418 # This file is part of zfec.
420 # This program is free software; you can redistribute it and/or modify it
421 # under the terms of the GNU General Public License as published by the Free
422 # Software Foundation; either version 2 of the License, or (at your option)
423 # any later version. This program also comes with the added permission that,
424 # in the case that you are obligated to release a derived work under this
425 # licence (as per section 2.b of the GPL), you may delay the fulfillment of
426 # this obligation for up to 12 months.
428 # This program is distributed in the hope that it will be useful,
429 # but WITHOUT ANY WARRANTY; without even the implied warranty of
430 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
431 # GNU General Public License for more details.
433 # You should have received a copy of the GNU General Public License
434 # along with this program; if not, write to the Free Software
435 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.