]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - zfec/zfec/filefec.py
zfec: add warning about changed share header in zfec v1.1
[tahoe-lafs/zfec.git] / zfec / zfec / filefec.py
1 import easyfec, zfec
2 from util import fileutil
3 from util.mathutil import log_ceil
4
5 import array, os, re, struct, traceback
6
7 CHUNKSIZE = 4096
8
9 class InsufficientShareFilesError(zfec.Error):
10     def __init__(self, k, kb, *args, **kwargs):
11         zfec.Error.__init__(self, *args, **kwargs)
12         self.k = k
13         self.kb = kb
14
15     def __repr__(self):
16         return "Insufficient share files -- %d share files are required to recover this file, but only %d were given" % (self.k, self.kb,)
17
18     def __str__(self):
19         return self.__repr__()
20
21 class CorruptedShareFilesError(zfec.Error):
22     pass
23
24 def _build_header(m, k, pad, sh):
25     """
26     @param m: the total number of shares; 1 <= m <= 256
27     @param k: the number of shares required to reconstruct; 1 <= k <= m
28     @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k
29     @param sh: the shnum of this share; 0 <= k < m
30
31     @return: a compressed string encoding m, k, pad, and sh
32     """
33     assert m >= 1
34     assert m <= 2**8
35     assert k >= 1
36     assert k <= m
37     assert pad >= 0
38     assert pad < k
39
40     assert sh >= 0
41     assert sh < m
42
43     bitsused = 0
44     val = 0
45
46     val |= (m - 1)
47     bitsused += 8 # the first 8 bits always encode m
48
49     kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
50     val <<= kbits
51     bitsused += kbits
52
53     val |= (k - 1)
54
55     padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
56     val <<= padbits
57     bitsused += padbits
58
59     val |= pad
60
61     shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
62     val <<= shnumbits
63     bitsused += shnumbits
64
65     val |= sh
66
67     assert bitsused >= 8, bitsused
68     assert bitsused <= 32, bitsused
69
70     if bitsused <= 16:
71         val <<= (16-bitsused)
72         cs = struct.pack('>H', val)
73         assert cs[:-2] == '\x00' * (len(cs)-2)
74         return cs[-2:]
75     if bitsused <= 24:
76         val <<= (24-bitsused)
77         cs = struct.pack('>I', val)
78         assert cs[:-3] == '\x00' * (len(cs)-3)
79         return cs[-3:]
80     else:
81         val <<= (32-bitsused)
82         cs = struct.pack('>I', val)
83         assert cs[:-4] == '\x00' * (len(cs)-4)
84         return cs[-4:]
85
86 def MASK(bits):
87     return (1<<bits)-1
88
89 def _parse_header(inf):
90     """
91     @param inf: an object which I can call read(1) on to get another byte
92
93     @return: tuple of (m, k, pad, sh,); side-effect: the first one to four
94         bytes of inf will be read
95     """
96     # The first 8 bits always encode m.
97     ch = inf.read(1)
98     if not ch:
99         raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
100     byte = ord(ch)
101     m = byte + 1
102
103     # The next few bits encode k.
104     kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
105     b2_bits_left = 8-kbits
106     kbitmask = MASK(kbits) << b2_bits_left
107     ch = inf.read(1)
108     if not ch:
109         raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
110     byte = ord(ch)
111     k = ((byte & kbitmask) >> b2_bits_left) + 1
112
113     shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
114     padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
115
116     val = byte & (~kbitmask)
117
118     needed_padbits = padbits - b2_bits_left
119     if needed_padbits > 0:
120         ch = inf.read(1)
121         if not ch:
122             raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
123         byte = struct.unpack(">B", ch)[0]
124         val <<= 8
125         val |= byte 
126         needed_padbits -= 8
127     assert needed_padbits <= 0
128     extrabits = -needed_padbits
129     pad = val >> extrabits
130     val &= MASK(extrabits)
131
132     needed_shbits = shbits - extrabits
133     if needed_shbits > 0:
134         ch = inf.read(1)
135         if not ch:
136             raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
137         byte = struct.unpack(">B", ch)[0]
138         val <<= 8
139         val |= byte 
140         needed_shbits -= 8
141     assert needed_shbits <= 0
142
143     gotshbits = -needed_shbits
144
145     sh = val >> gotshbits
146
147     return (m, k, pad, sh,)
148
149 FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s"
150 RE_FORMAT = "%s.[0-9]+_[0-9]+%s"
151 def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=False, verbose=False):
152     """
153     Encode inf, writing the shares to specially named, newly created files.
154
155     @param fsize: calling read() on inf must yield fsize bytes of data and 
156         then raise an EOFError
157     @param dirname: the name of the directory into which the sharefiles will
158         be written
159     """
160     mlen = len(str(m))
161     format = FORMAT_FORMAT % (mlen, mlen,)
162
163     padbytes = zfec.util.mathutil.pad_size(fsize, k)
164
165     fns = []
166     fs = []
167     try:
168         for shnum in range(m):
169             hdr = _build_header(m, k, padbytes, shnum)
170
171             fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,))
172             if verbose:
173                 print "Creating share file %r..." % (fn,)
174             if overwrite:
175                 f = open(fn, "wb")
176             else:
177                 flags = os.O_WRONLY|os.O_CREAT|os.O_EXCL | (hasattr(os, 'O_BINARY') and os.O_BINARY)
178                 fd = os.open(fn, flags)
179                 f = os.fdopen(fd, "wb")
180             f.write(hdr)
181             fs.append(f)
182             fns.append(fn)
183         sumlen = [0]
184         def cb(blocks, length):
185             assert len(blocks) == len(fs)
186             oldsumlen = sumlen[0]
187             sumlen[0] += length
188             if verbose:
189                 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
190                     print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
191             
192             if sumlen[0] > fsize:
193                 raise IOError("Wrong file size -- possibly the size of the file changed during encoding.  Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
194             for i in range(len(blocks)):
195                 data = blocks[i]
196                 fs[i].write(data)
197                 length -= len(data)
198
199         encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
200     except EnvironmentError, le:
201         print "Cannot complete because of exception: "
202         print le
203         print "Cleaning up..."
204         # clean up
205         while fs:
206             f = fs.pop()
207             f.close() ; del f
208             fn = fns.pop()
209             if verbose:
210                 print "Cleaning up: trying to remove %r..." % (fn,)
211             fileutil.remove_if_possible(fn)
212         return 1
213     if verbose:
214         print 
215         print "Done!"
216     return 0
217
218 # Note: if you really prefer base-2 and you change this code, then please
219 # denote 2^20 as "MiB" instead of "MB" in order to avoid ambiguity.
220 # Thanks.
221 # http://en.wikipedia.org/wiki/Megabyte
222 MILLION_BYTES=10**6
223
224 def decode_from_files(outf, infiles, verbose=False):
225     """
226     Decode from the first k files in infiles, writing the results to outf.
227     """
228     assert len(infiles) >= 2
229     infs = []
230     shnums = []
231     m = None
232     k = None
233     padlen = None
234
235     byteswritten = 0
236     for f in infiles:
237         (nm, nk, npadlen, shnum,) = _parse_header(f)
238         if not (m is None or m == nm):
239             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that m was %s but another share file previously said that m was %s" % (f.name, nm, m,))
240         m = nm
241         if not (k is None or k == nk):
242             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that k was %s but another share file previously said that k was %s" % (f.name, nk, k,))
243         if k > len(infiles):
244             raise InsufficientShareFilesError(k, len(infiles))
245         k = nk
246         if not (padlen is None or padlen == npadlen):
247             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that pad length was %s but another share file previously said that pad length was %s" % (f.name, npadlen, padlen,))
248         padlen = npadlen
249
250         infs.append(f)
251         shnums.append(shnum)
252
253         if len(infs) == k:
254             break
255
256     dec = easyfec.Decoder(k, m)
257
258     while True:
259         chunks = [ inf.read(CHUNKSIZE) for inf in infs ]
260         if [ch for ch in chunks if len(ch) != len(chunks[-1])]:
261             raise CorruptedShareFilesError("Share files were corrupted -- all share files are required to be the same length, but they weren't.")
262
263         if len(chunks[-1]) == CHUNKSIZE:
264             # Then this was a full read, so we're still in the sharefiles.
265             resultdata = dec.decode(chunks, shnums, padlen=0)
266             outf.write(resultdata)
267             byteswritten += len(resultdata)
268             if verbose:
269                 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
270                     print str(byteswritten / MILLION_BYTES) + " MB ...",
271         else:
272             # Then this was a short read, so we've reached the end of the sharefiles.
273             resultdata = dec.decode(chunks, shnums, padlen)
274             outf.write(resultdata)
275             return # Done.
276     if verbose:
277         print
278         print "Done!"
279
280 def encode_file(inf, cb, k, m, chunksize=4096):
281     """
282     Read in the contents of inf, encode, and call cb with the results.
283
284     First, k "input blocks" will be read from inf, each input block being of 
285     size chunksize.  Then these k blocks will be encoded into m "result 
286     blocks".  Then cb will be invoked, passing a list of the m result blocks 
287     as its first argument, and the length of the encoded data as its second 
288     argument.  (The length of the encoded data is always equal to k*chunksize, 
289     until the last iteration, when the end of the file has been reached and 
290     less than k*chunksize bytes could be read from the file.)  This procedure 
291     is iterated until the end of the file is reached, in which case the space 
292     of the input blocks that is unused is filled with zeroes before encoding.
293
294     Note that the sequence passed in calls to cb() contains mutable array
295     objects in its first k elements whose contents will be overwritten when 
296     the next segment is read from the input file.  Therefore the 
297     implementation of cb() has to either be finished with those first k arrays 
298     before returning, or if it wants to keep the contents of those arrays for 
299     subsequent use after it has returned then it must make a copy of them to 
300     keep.
301
302     @param inf the file object from which to read the data
303     @param cb the callback to be invoked with the results
304     @param k the number of shares required to reconstruct the file
305     @param m the total number of shares created
306     @param chunksize how much data to read from inf for each of the k input 
307         blocks
308     """
309     enc = zfec.Encoder(k, m)
310     l = tuple([ array.array('c') for i in range(k) ])
311     indatasize = k*chunksize # will be reset to shorter upon EOF
312     eof = False
313     ZEROES=array.array('c', ['\x00'])*chunksize
314     while not eof:
315         # This loop body executes once per segment.
316         i = 0
317         while (i<len(l)):
318             # This loop body executes once per chunk.
319             a = l[i]
320             del a[:]
321             try:
322                 a.fromfile(inf, chunksize)
323                 i += 1
324             except EOFError:
325                 eof = True
326                 indatasize = i*chunksize + len(a)
327                 
328                 # padding
329                 a.fromstring("\x00" * (chunksize-len(a)))
330                 i += 1
331                 while (i<len(l)):
332                     a = l[i]
333                     a[:] = ZEROES
334                     i += 1
335
336         res = enc.encode(l)
337         cb(res, indatasize)
338
339 import sha
340 def encode_file_not_really(inf, cb, k, m, chunksize=4096):
341     enc = zfec.Encoder(k, m)
342     l = tuple([ array.array('c') for i in range(k) ])
343     indatasize = k*chunksize # will be reset to shorter upon EOF
344     eof = False
345     ZEROES=array.array('c', ['\x00'])*chunksize
346     while not eof:
347         # This loop body executes once per segment.
348         i = 0
349         while (i<len(l)):
350             # This loop body executes once per chunk.
351             a = l[i]
352             del a[:]
353             try:
354                 a.fromfile(inf, chunksize)
355                 i += 1
356             except EOFError:
357                 eof = True
358                 indatasize = i*chunksize + len(a)
359                 
360                 # padding
361                 a.fromstring("\x00" * (chunksize-len(a)))
362                 i += 1
363                 while (i<len(l)):
364                     a = l[i]
365                     a[:] = ZEROES
366                     i += 1
367
368         # res = enc.encode(l)
369         cb(None, None)
370
371 def encode_file_not_really_and_hash(inf, cb, k, m, chunksize=4096):
372     hasher = sha.new()
373     enc = zfec.Encoder(k, m)
374     l = tuple([ array.array('c') for i in range(k) ])
375     indatasize = k*chunksize # will be reset to shorter upon EOF
376     eof = False
377     ZEROES=array.array('c', ['\x00'])*chunksize
378     while not eof:
379         # This loop body executes once per segment.
380         i = 0
381         while (i<len(l)):
382             # This loop body executes once per chunk.
383             a = l[i]
384             del a[:]
385             try:
386                 a.fromfile(inf, chunksize)
387                 i += 1
388             except EOFError:
389                 eof = True
390                 indatasize = i*chunksize + len(a)
391                 
392                 # padding
393                 a.fromstring("\x00" * (chunksize-len(a)))
394                 i += 1
395                 while (i<len(l)):
396                     a = l[i]
397                     a[:] = ZEROES
398                     i += 1
399
400         # res = enc.encode(l)
401         for thing in l:
402             hasher.update(thing)
403         cb(None, None)
404
405 def encode_file_stringy(inf, cb, k, m, chunksize=4096):
406     """
407     Read in the contents of inf, encode, and call cb with the results.
408
409     First, k "input blocks" will be read from inf, each input block being of 
410     size chunksize.  Then these k blocks will be encoded into m "result 
411     blocks".  Then cb will be invoked, passing a list of the m result blocks 
412     as its first argument, and the length of the encoded data as its second 
413     argument.  (The length of the encoded data is always equal to k*chunksize, 
414     until the last iteration, when the end of the file has been reached and 
415     less than k*chunksize bytes could be read from the file.)  This procedure 
416     is iterated until the end of the file is reached, in which case the part 
417     of the input shares that is unused is filled with zeroes before encoding.
418
419     @param inf the file object from which to read the data
420     @param cb the callback to be invoked with the results
421     @param k the number of shares required to reconstruct the file
422     @param m the total number of shares created
423     @param chunksize how much data to read from inf for each of the k input 
424         blocks
425     """
426     enc = zfec.Encoder(k, m)
427     indatasize = k*chunksize # will be reset to shorter upon EOF
428     while indatasize == k*chunksize:
429         # This loop body executes once per segment.
430         i = 0
431         l = []
432         ZEROES = '\x00'*chunksize
433         while i<k:
434             # This loop body executes once per chunk.
435             i += 1
436             l.append(inf.read(chunksize))
437             if len(l[-1]) < chunksize:
438                 indatasize = i*chunksize + len(l[-1])
439                 
440                 # padding
441                 l[-1] = l[-1] + "\x00" * (chunksize-len(l[-1]))
442                 while i<k:
443                     l.append(ZEROES)
444                     i += 1
445
446         res = enc.encode(l)
447         cb(res, indatasize)
448
449 def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
450     """
451     Read in the contents of inf, encode, and call cb with the results.
452
453     First, chunksize*k bytes will be read from inf, then encoded into m
454     "result blocks".  Then cb will be invoked, passing a list of the m result
455     blocks as its first argument, and the length of the encoded data as its
456     second argument.  (The length of the encoded data is always equal to
457     k*chunksize, until the last iteration, when the end of the file has been
458     reached and less than k*chunksize bytes could be read from the file.)
459     This procedure is iterated until the end of the file is reached, in which
460     case the space of the input that is unused is filled with zeroes before
461     encoding.
462
463     @param inf the file object from which to read the data
464     @param cb the callback to be invoked with the results
465     @param k the number of shares required to reconstruct the file
466     @param m the total number of shares created
467     @param chunksize how much data to read from inf for each of the k input 
468         blocks
469     """
470     enc = easyfec.Encoder(k, m)
471
472     readsize = k*chunksize
473     indata = inf.read(readsize)
474     while indata:
475         res = enc.encode(indata)
476         cb(res, len(indata))
477         indata = inf.read(readsize)
478
479 # zfec -- fast forward error correction library with Python interface
480
481 # Copyright (C) 2007 Allmydata, Inc.
482 # Author: Zooko Wilcox-O'Hearn
483
484 # This file is part of zfec.
485
486 # This program is free software; you can redistribute it and/or modify it
487 # under the terms of the GNU General Public License as published by the Free
488 # Software Foundation; either version 2 of the License, or (at your option)
489 # any later version, with the added permission that, if you become obligated
490 # to release a derived work under this licence (as per section 2.b), you may
491 # delay the fulfillment of this obligation for up to 12 months.  See the file
492 # COPYING for details.
493 #
494 # If you would like to inquire about a commercial relationship with Allmydata,
495 # Inc., please contact partnerships@allmydata.com and visit
496 # http://allmydata.com/.