]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - zfec/zfec/filefec.py
zfec: use setuptools to construct executables, fix bug on Windows -- forgot "BINARY...
[tahoe-lafs/zfec.git] / zfec / zfec / filefec.py
1 import easyfec, zfec
2 from util import fileutil
3 from util.mathutil import log_ceil
4
5 import array, os, re, struct, traceback
6
7 CHUNKSIZE = 4096
8
9 class InsufficientShareFilesError(zfec.Error):
10     def __init__(self, k, kb, *args, **kwargs):
11         zfec.Error.__init__(self, *args, **kwargs)
12         self.k = k
13         self.kb = kb
14
15     def __repr__(self):
16         return "Insufficient share files -- %d share files are required to recover this file, but only %d were given" % (self.k, self.kb,)
17
18     def __str__(self):
19         return self.__repr__()
20
21 class CorruptedShareFilesError(zfec.Error):
22     pass
23
24 def _build_header(m, k, pad, sh):
25     """
26     @param m: the total number of shares; 3 <= m <= 256
27     @param k: the number of shares required to reconstruct; 2 <= k < m
28     @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k
29     @param sh: the shnum of this share; 0 <= k < m
30
31     @return: a string (which is hopefully short) encoding m, k, sh, and pad
32     """
33     assert m >= 3
34     assert m <= 2**8
35     assert k >= 2
36     assert k < m
37     assert pad >= 0
38     assert pad < k
39
40     assert sh >= 0
41     assert sh < m
42
43     bitsused = 0
44     val = 0
45
46     val |= (m - 3)
47     bitsused += 8 # the first 8 bits always encode m
48
49     kbits = log_ceil(m-2, 2) # num bits needed to store all possible values of k
50     val <<= kbits
51     bitsused += kbits
52
53     val |= (k - 2)
54
55     padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
56     val <<= padbits
57     bitsused += padbits
58
59     val |= pad
60
61     shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
62     val <<= shnumbits
63     bitsused += shnumbits
64
65     val |= sh
66
67     assert bitsused >= 11
68     assert bitsused <= 32
69
70     if bitsused <= 16:
71         val <<= (16-bitsused)
72         cs = struct.pack('>H', val)
73         assert cs[:-2] == '\x00' * (len(cs)-2)
74         return cs[-2:]
75     if bitsused <= 24:
76         val <<= (24-bitsused)
77         cs = struct.pack('>I', val)
78         assert cs[:-3] == '\x00' * (len(cs)-3)
79         return cs[-3:]
80     else:
81         val <<= (32-bitsused)
82         cs = struct.pack('>I', val)
83         assert cs[:-4] == '\x00' * (len(cs)-4)
84         return cs[-4:]
85
86 def MASK(bits):
87     return (1<<bits)-1
88
89 def _parse_header(inf):
90     """
91     @param inf: an object which I can call read(1) on to get another byte
92
93     @return: tuple of (m, k, pad, sh,); side-effect: the first one to four
94         bytes of inf will be read
95     """
96     # The first 8 bits always encode m.
97     ch = inf.read(1)
98     if not ch:
99         raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
100     byte = ord(ch)
101     m = byte + 3
102
103     # The next few bits encode k.
104     kbits = log_ceil(m-2, 2) # num bits needed to store all possible values of k
105     b2_bits_left = 8-kbits
106     kbitmask = MASK(kbits) << b2_bits_left
107     ch = inf.read(1)
108     if not ch:
109         raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
110     byte = ord(ch)
111     k = ((byte & kbitmask) >> b2_bits_left) + 2
112
113     shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
114     padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
115
116     val = byte & (~kbitmask)
117
118     needed_padbits = padbits - b2_bits_left
119     if needed_padbits > 0:
120         ch = inf.read(1)
121         if not ch:
122             raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
123         byte = struct.unpack(">B", ch)[0]
124         val <<= 8
125         val |= byte 
126         needed_padbits -= 8
127     assert needed_padbits <= 0
128     extrabits = -needed_padbits
129     pad = val >> extrabits
130     val &= MASK(extrabits)
131
132     needed_shbits = shbits - extrabits
133     if needed_shbits > 0:
134         ch = inf.read(1)
135         if not ch:
136             raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
137         byte = struct.unpack(">B", ch)[0]
138         val <<= 8
139         val |= byte 
140         needed_shbits -= 8
141     assert needed_shbits <= 0
142
143     gotshbits = -needed_shbits
144
145     sh = val >> gotshbits
146
147     return (m, k, pad, sh,)
148
149 FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s"
150 RE_FORMAT = "%s.[0-9]+_[0-9]+%s"
151 def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=False, verbose=False):
152     """
153     Encode inf, writing the shares to specially named, newly created files.
154
155     @param fsize: calling read() on inf must yield fsize bytes of data and 
156         then raise an EOFError
157     @param dirname: the name of the directory into which the sharefiles will
158         be written
159     """
160     mlen = len(str(m))
161     format = FORMAT_FORMAT % (mlen, mlen,)
162
163     padbytes = zfec.util.mathutil.pad_size(fsize, k)
164
165     fns = []
166     fs = []
167     try:
168         for shnum in range(m):
169             hdr = _build_header(m, k, padbytes, shnum)
170
171             fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,))
172             if verbose:
173                 print "Creating share file %r..." % (fn,)
174             if overwrite:
175                 f = open(fn, "wb")
176             else:
177                 fd = os.open(fn, os.O_WRONLY|os.O_CREAT|os.O_EXCL|os.O_BINARY)
178                 f = os.fdopen(fd, "wb")
179             f.write(hdr)
180             fs.append(f)
181             fns.append(fn)
182         sumlen = [0]
183         def cb(blocks, length):
184             assert len(blocks) == len(fs)
185             oldsumlen = sumlen[0]
186             sumlen[0] += length
187             if verbose:
188                 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
189                     print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
190             
191             if sumlen[0] > fsize:
192                 raise IOError("Wrong file size -- possibly the size of the file changed during encoding.  Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
193             for i in range(len(blocks)):
194                 data = blocks[i]
195                 fs[i].write(data)
196                 length -= len(data)
197
198         encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
199     except EnvironmentError, le:
200         print "Cannot complete because of exception: "
201         print le
202         print "Cleaning up..."
203         # clean up
204         while fs:
205             f = fs.pop()
206             f.close() ; del f
207             fn = fns.pop()
208             if verbose:
209                 print "Cleaning up: trying to remove %r..." % (fn,)
210             fileutil.remove_if_possible(fn)
211         return 1
212     if verbose:
213         print 
214         print "Done!"
215     return 0
216
217 # Note: if you really prefer base-2 and you change this code, then please
218 # denote 2^20 as "MiB" instead of "MB" in order to avoid ambiguity.
219 # Thanks.
220 # http://en.wikipedia.org/wiki/Megabyte
221 MILLION_BYTES=10**6
222
223 def decode_from_files(outf, infiles, verbose=False):
224     """
225     Decode from the first k files in infiles, writing the results to outf.
226     """
227     assert len(infiles) >= 2
228     infs = []
229     shnums = []
230     m = None
231     k = None
232     padlen = None
233
234     byteswritten = 0
235     for f in infiles:
236         (nm, nk, npadlen, shnum,) = _parse_header(f)
237         if not (m is None or m == nm):
238             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that m was %s but another share file previously said that m was %s" % (f.name, nm, m,))
239         m = nm
240         if not (k is None or k == nk):
241             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that k was %s but another share file previously said that k was %s" % (f.name, nk, k,))
242         if k > len(infiles):
243             raise InsufficientShareFilesError(k, len(infiles))
244         k = nk
245         if not (padlen is None or padlen == npadlen):
246             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that pad length was %s but another share file previously said that pad length was %s" % (f.name, npadlen, padlen,))
247         padlen = npadlen
248
249         infs.append(f)
250         shnums.append(shnum)
251
252         if len(infs) == k:
253             break
254
255     dec = easyfec.Decoder(k, m)
256
257     while True:
258         chunks = [ inf.read(CHUNKSIZE) for inf in infs ]
259         if [ch for ch in chunks if len(ch) != len(chunks[-1])]:
260             raise CorruptedShareFilesError("Share files were corrupted -- all share files are required to be the same length, but they weren't.")
261
262         if len(chunks[-1]) == CHUNKSIZE:
263             # Then this was a full read, so we're still in the sharefiles.
264             resultdata = dec.decode(chunks, shnums, padlen=0)
265             outf.write(resultdata)
266             byteswritten += len(resultdata)
267             if verbose:
268                 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
269                     print str(byteswritten / MILLION_BYTES) + " MB ...",
270         else:
271             # Then this was a short read, so we've reached the end of the sharefiles.
272             resultdata = dec.decode(chunks, shnums, padlen)
273             outf.write(resultdata)
274             return # Done.
275     if verbose:
276         print
277         print "Done!"
278
279 def encode_file(inf, cb, k, m, chunksize=4096):
280     """
281     Read in the contents of inf, encode, and call cb with the results.
282
283     First, k "input blocks" will be read from inf, each input block being of 
284     size chunksize.  Then these k blocks will be encoded into m "result 
285     blocks".  Then cb will be invoked, passing a list of the m result blocks 
286     as its first argument, and the length of the encoded data as its second 
287     argument.  (The length of the encoded data is always equal to k*chunksize, 
288     until the last iteration, when the end of the file has been reached and 
289     less than k*chunksize bytes could be read from the file.)  This procedure 
290     is iterated until the end of the file is reached, in which case the space 
291     of the input blocks that is unused is filled with zeroes before encoding.
292
293     Note that the sequence passed in calls to cb() contains mutable array
294     objects in its first k elements whose contents will be overwritten when 
295     the next segment is read from the input file.  Therefore the 
296     implementation of cb() has to either be finished with those first k arrays 
297     before returning, or if it wants to keep the contents of those arrays for 
298     subsequent use after it has returned then it must make a copy of them to 
299     keep.
300
301     @param inf the file object from which to read the data
302     @param cb the callback to be invoked with the results
303     @param k the number of shares required to reconstruct the file
304     @param m the total number of shares created
305     @param chunksize how much data to read from inf for each of the k input 
306         blocks
307     """
308     enc = zfec.Encoder(k, m)
309     l = tuple([ array.array('c') for i in range(k) ])
310     indatasize = k*chunksize # will be reset to shorter upon EOF
311     eof = False
312     ZEROES=array.array('c', ['\x00'])*chunksize
313     while not eof:
314         # This loop body executes once per segment.
315         i = 0
316         while (i<len(l)):
317             # This loop body executes once per chunk.
318             a = l[i]
319             del a[:]
320             try:
321                 a.fromfile(inf, chunksize)
322                 i += 1
323             except EOFError:
324                 eof = True
325                 indatasize = i*chunksize + len(a)
326                 
327                 # padding
328                 a.fromstring("\x00" * (chunksize-len(a)))
329                 i += 1
330                 while (i<len(l)):
331                     a = l[i]
332                     a[:] = ZEROES
333                     i += 1
334
335         res = enc.encode(l)
336         cb(res, indatasize)
337
338 def encode_file_stringy(inf, cb, k, m, chunksize=4096):
339     """
340     Read in the contents of inf, encode, and call cb with the results.
341
342     First, k "input blocks" will be read from inf, each input block being of 
343     size chunksize.  Then these k blocks will be encoded into m "result 
344     blocks".  Then cb will be invoked, passing a list of the m result blocks 
345     as its first argument, and the length of the encoded data as its second 
346     argument.  (The length of the encoded data is always equal to k*chunksize, 
347     until the last iteration, when the end of the file has been reached and 
348     less than k*chunksize bytes could be read from the file.)  This procedure 
349     is iterated until the end of the file is reached, in which case the part 
350     of the input shares that is unused is filled with zeroes before encoding.
351
352     @param inf the file object from which to read the data
353     @param cb the callback to be invoked with the results
354     @param k the number of shares required to reconstruct the file
355     @param m the total number of shares created
356     @param chunksize how much data to read from inf for each of the k input 
357         blocks
358     """
359     enc = zfec.Encoder(k, m)
360     indatasize = k*chunksize # will be reset to shorter upon EOF
361     while indatasize == k*chunksize:
362         # This loop body executes once per segment.
363         i = 0
364         l = []
365         ZEROES = '\x00'*chunksize
366         while i<k:
367             # This loop body executes once per chunk.
368             i += 1
369             l.append(inf.read(chunksize))
370             if len(l[-1]) < chunksize:
371                 indatasize = i*chunksize + len(l[-1])
372                 
373                 # padding
374                 l[-1] = l[-1] + "\x00" * (chunksize-len(l[-1]))
375                 while i<k:
376                     l.append(ZEROES)
377                     i += 1
378
379         res = enc.encode(l)
380         cb(res, indatasize)
381
382 def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
383     """
384     Read in the contents of inf, encode, and call cb with the results.
385
386     First, chunksize*k bytes will be read from inf, then encoded into m
387     "result blocks".  Then cb will be invoked, passing a list of the m result
388     blocks as its first argument, and the length of the encoded data as its
389     second argument.  (The length of the encoded data is always equal to
390     k*chunksize, until the last iteration, when the end of the file has been
391     reached and less than k*chunksize bytes could be read from the file.)
392     This procedure is iterated until the end of the file is reached, in which
393     case the space of the input that is unused is filled with zeroes before
394     encoding.
395
396     @param inf the file object from which to read the data
397     @param cb the callback to be invoked with the results
398     @param k the number of shares required to reconstruct the file
399     @param m the total number of shares created
400     @param chunksize how much data to read from inf for each of the k input 
401         blocks
402     """
403     enc = easyfec.Encoder(k, m)
404
405     readsize = k*chunksize
406     indata = inf.read(readsize)
407     while indata:
408         res = enc.encode(indata)
409         cb(res, len(indata))
410         indata = inf.read(readsize)
411
412 # zfec -- fast forward error correction library with Python interface
413
414 # Copyright (C) 2007 Allmydata, Inc.
415 # Author: Zooko Wilcox-O'Hearn
416 # mailto:zooko@zooko.com
417
418 # This file is part of zfec.
419
420 # This program is free software; you can redistribute it and/or modify it
421 # under the terms of the GNU General Public License as published by the Free
422 # Software Foundation; either version 2 of the License, or (at your option)
423 # any later version.  This program also comes with the added permission that,
424 # in the case that you are obligated to release a derived work under this
425 # licence (as per section 2.b of the GPL), you may delay the fulfillment of
426 # this obligation for up to 12 months.
427
428 # This program is distributed in the hope that it will be useful,
429 # but WITHOUT ANY WARRANTY; without even the implied warranty of
430 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
431 # GNU General Public License for more details.
432
433 # You should have received a copy of the GNU General Public License
434 # along with this program; if not, write to the Free Software
435 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
436