]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - zfec/filefec.py
filefec: Bug fix for the files with certain lengths (see the longer explanation)
[tahoe-lafs/zfec.git] / zfec / filefec.py
1 import easyfec, zfec
2 from pyutil import fileutil
3 from pyutil.mathutil import pad_size, log_ceil
4
5 import array, os, struct
6
7 CHUNKSIZE = 4096
8
9 from base64 import b32encode
10 def ab(x): # debuggery
11     if len(x) >= 3:
12         return "%s:%s" % (len(x), b32encode(x[-3:]),)
13     elif len(x) == 2:
14         return "%s:%s" % (len(x), b32encode(x[-2:]),)
15     elif len(x) == 1:
16         return "%s:%s" % (len(x), b32encode(x[-1:]),)
17     elif len(x) == 0:
18         return "%s:%s" % (len(x), "--empty--",)
19
20 class InsufficientShareFilesError(zfec.Error):
21     def __init__(self, k, kb, *args, **kwargs):
22         zfec.Error.__init__(self, *args, **kwargs)
23         self.k = k
24         self.kb = kb
25
26     def __repr__(self):
27         return "Insufficient share files -- %d share files are required to recover this file, but only %d were given" % (self.k, self.kb,)
28
29     def __str__(self):
30         return self.__repr__()
31
32 class CorruptedShareFilesError(zfec.Error):
33     pass
34
35 def _build_header(m, k, pad, sh):
36     """
37     @param m: the total number of shares; 1 <= m <= 256
38     @param k: the number of shares required to reconstruct; 1 <= k <= m
39     @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k
40     @param sh: the shnum of this share; 0 <= k < m
41
42     @return: a compressed string encoding m, k, pad, and sh
43     """
44     assert m >= 1
45     assert m <= 2**8
46     assert k >= 1
47     assert k <= m
48     assert pad >= 0
49     assert pad < k
50
51     assert sh >= 0
52     assert sh < m
53
54     bitsused = 0
55     val = 0
56
57     val |= (m - 1)
58     bitsused += 8 # the first 8 bits always encode m
59
60     kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
61     val <<= kbits
62     bitsused += kbits
63
64     val |= (k - 1)
65
66     padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
67     val <<= padbits
68     bitsused += padbits
69
70     val |= pad
71
72     shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
73     val <<= shnumbits
74     bitsused += shnumbits
75
76     val |= sh
77
78     assert bitsused >= 8, bitsused
79     assert bitsused <= 32, bitsused
80
81     if bitsused <= 16:
82         val <<= (16-bitsused)
83         cs = struct.pack('>H', val)
84         assert cs[:-2] == '\x00' * (len(cs)-2)
85         return cs[-2:]
86     if bitsused <= 24:
87         val <<= (24-bitsused)
88         cs = struct.pack('>I', val)
89         assert cs[:-3] == '\x00' * (len(cs)-3)
90         return cs[-3:]
91     else:
92         val <<= (32-bitsused)
93         cs = struct.pack('>I', val)
94         assert cs[:-4] == '\x00' * (len(cs)-4)
95         return cs[-4:]
96
97 def MASK(bits):
98     return (1<<bits)-1
99
100 def _parse_header(inf):
101     """
102     @param inf: an object which I can call read(1) on to get another byte
103
104     @return: tuple of (m, k, pad, sh,); side-effect: the first one to four
105         bytes of inf will be read
106     """
107     # The first 8 bits always encode m.
108     ch = inf.read(1)
109     if not ch:
110         raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
111     byte = ord(ch)
112     m = byte + 1
113
114     # The next few bits encode k.
115     kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
116     b2_bits_left = 8-kbits
117     kbitmask = MASK(kbits) << b2_bits_left
118     ch = inf.read(1)
119     if not ch:
120         raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
121     byte = ord(ch)
122     k = ((byte & kbitmask) >> b2_bits_left) + 1
123
124     shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
125     padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
126
127     val = byte & (~kbitmask)
128
129     needed_padbits = padbits - b2_bits_left
130     if needed_padbits > 0:
131         ch = inf.read(1)
132         if not ch:
133             raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
134         byte = struct.unpack(">B", ch)[0]
135         val <<= 8
136         val |= byte
137         needed_padbits -= 8
138     assert needed_padbits <= 0
139     extrabits = -needed_padbits
140     pad = val >> extrabits
141     val &= MASK(extrabits)
142
143     needed_shbits = shbits - extrabits
144     if needed_shbits > 0:
145         ch = inf.read(1)
146         if not ch:
147             raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
148         byte = struct.unpack(">B", ch)[0]
149         val <<= 8
150         val |= byte
151         needed_shbits -= 8
152     assert needed_shbits <= 0
153
154     gotshbits = -needed_shbits
155
156     sh = val >> gotshbits
157
158     return (m, k, pad, sh,)
159
160 FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s"
161 RE_FORMAT = "%s.[0-9]+_[0-9]+%s"
162 def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=False, verbose=False):
163     """
164     Encode inf, writing the shares to specially named, newly created files.
165
166     @param fsize: calling read() on inf must yield fsize bytes of data and
167         then raise an EOFError
168     @param dirname: the name of the directory into which the sharefiles will
169         be written
170     """
171     mlen = len(str(m))
172     format = FORMAT_FORMAT % (mlen, mlen,)
173
174     padbytes = pad_size(fsize, k)
175
176     fns = []
177     fs = []
178     try:
179         for shnum in range(m):
180             hdr = _build_header(m, k, padbytes, shnum)
181
182             fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,))
183             if verbose:
184                 print "Creating share file %r..." % (fn,)
185             if overwrite:
186                 f = open(fn, "wb")
187             else:
188                 flags = os.O_WRONLY|os.O_CREAT|os.O_EXCL | (hasattr(os, 'O_BINARY') and os.O_BINARY)
189                 fd = os.open(fn, flags)
190                 f = os.fdopen(fd, "wb")
191             f.write(hdr)
192             fs.append(f)
193             fns.append(fn)
194         sumlen = [0]
195         def cb(blocks, length):
196             assert len(blocks) == len(fs)
197             oldsumlen = sumlen[0]
198             sumlen[0] += length
199             if verbose:
200                 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
201                     print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
202
203             if sumlen[0] > fsize:
204                 raise IOError("Wrong file size -- possibly the size of the file changed during encoding.  Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
205             for i in range(len(blocks)):
206                 data = blocks[i]
207                 fs[i].write(data)
208                 length -= len(data)
209
210         encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
211     except EnvironmentError, le:
212         print "Cannot complete because of exception: "
213         print le
214         print "Cleaning up..."
215         # clean up
216         while fs:
217             f = fs.pop()
218             f.close() ; del f
219             fn = fns.pop()
220             if verbose:
221                 print "Cleaning up: trying to remove %r..." % (fn,)
222             fileutil.remove_if_possible(fn)
223         return 1
224     if verbose:
225         print
226         print "Done!"
227     return 0
228
229 # Note: if you really prefer base-2 and you change this code, then please
230 # denote 2^20 as "MiB" instead of "MB" in order to avoid ambiguity.  See:
231 # http://en.wikipedia.org/wiki/Megabyte
232 # Thanks.
233 MILLION_BYTES=10**6
234
235 def decode_from_files(outf, infiles, verbose=False):
236     """
237     Decode from the first k files in infiles, writing the results to outf.
238     """
239     assert len(infiles) >= 2
240     infs = []
241     shnums = []
242     m = None
243     k = None
244     padlen = None
245
246     byteswritten = 0
247     for f in infiles:
248         (nm, nk, npadlen, shnum,) = _parse_header(f)
249         if not (m is None or m == nm):
250             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that m was %s but another share file previously said that m was %s" % (f.name, nm, m,))
251         m = nm
252         if not (k is None or k == nk):
253             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that k was %s but another share file previously said that k was %s" % (f.name, nk, k,))
254         if k > len(infiles):
255             raise InsufficientShareFilesError(k, len(infiles))
256         k = nk
257         if not (padlen is None or padlen == npadlen):
258             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that pad length was %s but another share file previously said that pad length was %s" % (f.name, npadlen, padlen,))
259         padlen = npadlen
260
261         infs.append(f)
262         shnums.append(shnum)
263
264         if len(infs) == k:
265             break
266
267     dec = easyfec.Decoder(k, m)
268
269     while True:
270         chunks = [ inf.read(CHUNKSIZE) for inf in infs ]
271         if [ch for ch in chunks if len(ch) != len(chunks[-1])]:
272             raise CorruptedShareFilesError("Share files were corrupted -- all share files are required to be the same length, but they weren't.")
273
274         if len(chunks[-1]) > 0:
275             resultdata = dec.decode(chunks, shnums, padlen=0)
276             outf.write(resultdata)
277             byteswritten += len(resultdata)
278             if verbose:
279                 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
280                     print str(byteswritten / MILLION_BYTES) + " MB ...",
281         else:
282             if padlen > 0:
283                 outf.truncate(byteswritten - padlen)
284             return # Done.
285     if verbose:
286         print
287         print "Done!"
288
289 def encode_file(inf, cb, k, m, chunksize=4096):
290     """
291     Read in the contents of inf, encode, and call cb with the results.
292
293     First, k "input blocks" will be read from inf, each input block being of
294     size chunksize.  Then these k blocks will be encoded into m "result
295     blocks".  Then cb will be invoked, passing a list of the m result blocks
296     as its first argument, and the length of the encoded data as its second
297     argument.  (The length of the encoded data is always equal to k*chunksize,
298     until the last iteration, when the end of the file has been reached and
299     less than k*chunksize bytes could be read from the file.)  This procedure
300     is iterated until the end of the file is reached, in which case the space
301     of the input blocks that is unused is filled with zeroes before encoding.
302
303     Note that the sequence passed in calls to cb() contains mutable array
304     objects in its first k elements whose contents will be overwritten when
305     the next segment is read from the input file.  Therefore the
306     implementation of cb() has to either be finished with those first k arrays
307     before returning, or if it wants to keep the contents of those arrays for
308     subsequent use after it has returned then it must make a copy of them to
309     keep.
310
311     @param inf the file object from which to read the data
312     @param cb the callback to be invoked with the results
313     @param k the number of shares required to reconstruct the file
314     @param m the total number of shares created
315     @param chunksize how much data to read from inf for each of the k input
316         blocks
317     """
318     enc = zfec.Encoder(k, m)
319     l = tuple([ array.array('c') for i in range(k) ])
320     indatasize = k*chunksize # will be reset to shorter upon EOF
321     eof = False
322     ZEROES=array.array('c', ['\x00'])*chunksize
323     while not eof:
324         # This loop body executes once per segment.
325         i = 0
326         while (i<len(l)):
327             # This loop body executes once per chunk.
328             a = l[i]
329             del a[:]
330             try:
331                 a.fromfile(inf, chunksize)
332                 i += 1
333             except EOFError:
334                 eof = True
335                 indatasize = i*chunksize + len(a)
336
337                 # padding
338                 a.fromstring("\x00" * (chunksize-len(a)))
339                 i += 1
340                 while (i<len(l)):
341                     a = l[i]
342                     a[:] = ZEROES
343                     i += 1
344
345         res = enc.encode(l)
346         cb(res, indatasize)
347
348 try:
349     from hashlib import sha1
350     sha1 = sha1 # hush pyflakes
351 except ImportError:
352     # hashlib was added in Python 2.5.0.
353     import sha
354     sha1 = sha
355
356 def encode_file_not_really(inf, cb, k, m, chunksize=4096):
357     enc = zfec.Encoder(k, m)
358     l = tuple([ array.array('c') for i in range(k) ])
359     indatasize = k*chunksize # will be reset to shorter upon EOF
360     eof = False
361     ZEROES=array.array('c', ['\x00'])*chunksize
362     while not eof:
363         # This loop body executes once per segment.
364         i = 0
365         while (i<len(l)):
366             # This loop body executes once per chunk.
367             a = l[i]
368             del a[:]
369             try:
370                 a.fromfile(inf, chunksize)
371                 i += 1
372             except EOFError:
373                 eof = True
374                 indatasize = i*chunksize + len(a)
375
376                 # padding
377                 a.fromstring("\x00" * (chunksize-len(a)))
378                 i += 1
379                 while (i<len(l)):
380                     a = l[i]
381                     a[:] = ZEROES
382                     i += 1
383
384         # res = enc.encode(l)
385         cb(None, None)
386
387 def encode_file_not_really_and_hash(inf, cb, k, m, chunksize=4096):
388     hasher = sha1.new()
389     enc = zfec.Encoder(k, m)
390     l = tuple([ array.array('c') for i in range(k) ])
391     indatasize = k*chunksize # will be reset to shorter upon EOF
392     eof = False
393     ZEROES=array.array('c', ['\x00'])*chunksize
394     while not eof:
395         # This loop body executes once per segment.
396         i = 0
397         while (i<len(l)):
398             # This loop body executes once per chunk.
399             a = l[i]
400             del a[:]
401             try:
402                 a.fromfile(inf, chunksize)
403                 i += 1
404             except EOFError:
405                 eof = True
406                 indatasize = i*chunksize + len(a)
407
408                 # padding
409                 a.fromstring("\x00" * (chunksize-len(a)))
410                 i += 1
411                 while (i<len(l)):
412                     a = l[i]
413                     a[:] = ZEROES
414                     i += 1
415
416         # res = enc.encode(l)
417         for thing in l:
418             hasher.update(thing)
419         cb(None, None)
420
421 def encode_file_stringy(inf, cb, k, m, chunksize=4096):
422     """
423     Read in the contents of inf, encode, and call cb with the results.
424
425     First, k "input blocks" will be read from inf, each input block being of
426     size chunksize.  Then these k blocks will be encoded into m "result
427     blocks".  Then cb will be invoked, passing a list of the m result blocks
428     as its first argument, and the length of the encoded data as its second
429     argument.  (The length of the encoded data is always equal to k*chunksize,
430     until the last iteration, when the end of the file has been reached and
431     less than k*chunksize bytes could be read from the file.)  This procedure
432     is iterated until the end of the file is reached, in which case the part
433     of the input shares that is unused is filled with zeroes before encoding.
434
435     @param inf the file object from which to read the data
436     @param cb the callback to be invoked with the results
437     @param k the number of shares required to reconstruct the file
438     @param m the total number of shares created
439     @param chunksize how much data to read from inf for each of the k input
440         blocks
441     """
442     enc = zfec.Encoder(k, m)
443     indatasize = k*chunksize # will be reset to shorter upon EOF
444     while indatasize == k*chunksize:
445         # This loop body executes once per segment.
446         i = 0
447         l = []
448         ZEROES = '\x00'*chunksize
449         while i<k:
450             # This loop body executes once per chunk.
451             i += 1
452             l.append(inf.read(chunksize))
453             if len(l[-1]) < chunksize:
454                 indatasize = i*chunksize + len(l[-1])
455
456                 # padding
457                 l[-1] = l[-1] + "\x00" * (chunksize-len(l[-1]))
458                 while i<k:
459                     l.append(ZEROES)
460                     i += 1
461
462         res = enc.encode(l)
463         cb(res, indatasize)
464
465 def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
466     """
467     Read in the contents of inf, encode, and call cb with the results.
468
469     First, chunksize*k bytes will be read from inf, then encoded into m
470     "result blocks".  Then cb will be invoked, passing a list of the m result
471     blocks as its first argument, and the length of the encoded data as its
472     second argument.  (The length of the encoded data is always equal to
473     k*chunksize, until the last iteration, when the end of the file has been
474     reached and less than k*chunksize bytes could be read from the file.)
475     This procedure is iterated until the end of the file is reached, in which
476     case the space of the input that is unused is filled with zeroes before
477     encoding.
478
479     @param inf the file object from which to read the data
480     @param cb the callback to be invoked with the results
481     @param k the number of shares required to reconstruct the file
482     @param m the total number of shares created
483     @param chunksize how much data to read from inf for each of the k input
484         blocks
485     """
486     enc = easyfec.Encoder(k, m)
487
488     readsize = k*chunksize
489     indata = inf.read(readsize)
490     while indata:
491         res = enc.encode(indata)
492         cb(res, len(indata))
493         indata = inf.read(readsize)
494
495 # zfec -- fast forward error correction library with Python interface
496 #
497 # Copyright (C) 2007-2010 Allmydata, Inc.
498 # Author: Zooko Wilcox-O'Hearn
499 #
500 # This file is part of zfec.
501 #
502 # See README.rst for licensing information.