]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - zfec/zfec/filefec.py
99de3bdc87b9b58515486b7e938611bea2f473af
[tahoe-lafs/zfec.git] / zfec / zfec / filefec.py
1 import easyfec, zfec
2 from pyutil import fileutil
3 from pyutil.mathutil import pad_size, log_ceil
4
5 import array, os, struct
6
7 CHUNKSIZE = 4096
8
9 from base64 import b32encode
10 def ab(x): # debuggery
11     if len(x) >= 3:
12         return "%s:%s" % (len(x), b32encode(x[-3:]),)
13     elif len(x) == 2:
14         return "%s:%s" % (len(x), b32encode(x[-2:]),)
15     elif len(x) == 1:
16         return "%s:%s" % (len(x), b32encode(x[-1:]),)
17     elif len(x) == 0:
18         return "%s:%s" % (len(x), "--empty--",)
19
20 class InsufficientShareFilesError(zfec.Error):
21     def __init__(self, k, kb, *args, **kwargs):
22         zfec.Error.__init__(self, *args, **kwargs)
23         self.k = k
24         self.kb = kb
25
26     def __repr__(self):
27         return "Insufficient share files -- %d share files are required to recover this file, but only %d were given" % (self.k, self.kb,)
28
29     def __str__(self):
30         return self.__repr__()
31
32 class CorruptedShareFilesError(zfec.Error):
33     pass
34
35 def _build_header(m, k, pad, sh):
36     """
37     @param m: the total number of shares; 1 <= m <= 256
38     @param k: the number of shares required to reconstruct; 1 <= k <= m
39     @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k
40     @param sh: the shnum of this share; 0 <= k < m
41
42     @return: a compressed string encoding m, k, pad, and sh
43     """
44     assert m >= 1
45     assert m <= 2**8
46     assert k >= 1
47     assert k <= m
48     assert pad >= 0
49     assert pad < k
50
51     assert sh >= 0
52     assert sh < m
53
54     bitsused = 0
55     val = 0
56
57     val |= (m - 1)
58     bitsused += 8 # the first 8 bits always encode m
59
60     kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
61     val <<= kbits
62     bitsused += kbits
63
64     val |= (k - 1)
65
66     padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
67     val <<= padbits
68     bitsused += padbits
69
70     val |= pad
71
72     shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
73     val <<= shnumbits
74     bitsused += shnumbits
75
76     val |= sh
77
78     assert bitsused >= 8, bitsused
79     assert bitsused <= 32, bitsused
80
81     if bitsused <= 16:
82         val <<= (16-bitsused)
83         cs = struct.pack('>H', val)
84         assert cs[:-2] == '\x00' * (len(cs)-2)
85         return cs[-2:]
86     if bitsused <= 24:
87         val <<= (24-bitsused)
88         cs = struct.pack('>I', val)
89         assert cs[:-3] == '\x00' * (len(cs)-3)
90         return cs[-3:]
91     else:
92         val <<= (32-bitsused)
93         cs = struct.pack('>I', val)
94         assert cs[:-4] == '\x00' * (len(cs)-4)
95         return cs[-4:]
96
97 def MASK(bits):
98     return (1<<bits)-1
99
100 def _parse_header(inf):
101     """
102     @param inf: an object which I can call read(1) on to get another byte
103
104     @return: tuple of (m, k, pad, sh,); side-effect: the first one to four
105         bytes of inf will be read
106     """
107     # The first 8 bits always encode m.
108     ch = inf.read(1)
109     if not ch:
110         raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
111     byte = ord(ch)
112     m = byte + 1
113
114     # The next few bits encode k.
115     kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
116     b2_bits_left = 8-kbits
117     kbitmask = MASK(kbits) << b2_bits_left
118     ch = inf.read(1)
119     if not ch:
120         raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
121     byte = ord(ch)
122     k = ((byte & kbitmask) >> b2_bits_left) + 1
123
124     shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
125     padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
126
127     val = byte & (~kbitmask)
128
129     needed_padbits = padbits - b2_bits_left
130     if needed_padbits > 0:
131         ch = inf.read(1)
132         if not ch:
133             raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
134         byte = struct.unpack(">B", ch)[0]
135         val <<= 8
136         val |= byte 
137         needed_padbits -= 8
138     assert needed_padbits <= 0
139     extrabits = -needed_padbits
140     pad = val >> extrabits
141     val &= MASK(extrabits)
142
143     needed_shbits = shbits - extrabits
144     if needed_shbits > 0:
145         ch = inf.read(1)
146         if not ch:
147             raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
148         byte = struct.unpack(">B", ch)[0]
149         val <<= 8
150         val |= byte 
151         needed_shbits -= 8
152     assert needed_shbits <= 0
153
154     gotshbits = -needed_shbits
155
156     sh = val >> gotshbits
157
158     return (m, k, pad, sh,)
159
160 FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s"
161 RE_FORMAT = "%s.[0-9]+_[0-9]+%s"
162 def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=False, verbose=False):
163     """
164     Encode inf, writing the shares to specially named, newly created files.
165
166     @param fsize: calling read() on inf must yield fsize bytes of data and 
167         then raise an EOFError
168     @param dirname: the name of the directory into which the sharefiles will
169         be written
170     """
171     mlen = len(str(m))
172     format = FORMAT_FORMAT % (mlen, mlen,)
173
174     padbytes = pad_size(fsize, k)
175
176     fns = []
177     fs = []
178     try:
179         for shnum in range(m):
180             hdr = _build_header(m, k, padbytes, shnum)
181
182             fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,))
183             if verbose:
184                 print "Creating share file %r..." % (fn,)
185             if overwrite:
186                 f = open(fn, "wb")
187             else:
188                 flags = os.O_WRONLY|os.O_CREAT|os.O_EXCL | (hasattr(os, 'O_BINARY') and os.O_BINARY)
189                 fd = os.open(fn, flags)
190                 f = os.fdopen(fd, "wb")
191             f.write(hdr)
192             fs.append(f)
193             fns.append(fn)
194         sumlen = [0]
195         def cb(blocks, length):
196             assert len(blocks) == len(fs)
197             oldsumlen = sumlen[0]
198             sumlen[0] += length
199             if verbose:
200                 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
201                     print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
202             
203             if sumlen[0] > fsize:
204                 raise IOError("Wrong file size -- possibly the size of the file changed during encoding.  Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
205             for i in range(len(blocks)):
206                 data = blocks[i]
207                 fs[i].write(data)
208                 length -= len(data)
209
210         encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
211     except EnvironmentError, le:
212         print "Cannot complete because of exception: "
213         print le
214         print "Cleaning up..."
215         # clean up
216         while fs:
217             f = fs.pop()
218             f.close() ; del f
219             fn = fns.pop()
220             if verbose:
221                 print "Cleaning up: trying to remove %r..." % (fn,)
222             fileutil.remove_if_possible(fn)
223         return 1
224     if verbose:
225         print 
226         print "Done!"
227     return 0
228
229 # Note: if you really prefer base-2 and you change this code, then please
230 # denote 2^20 as "MiB" instead of "MB" in order to avoid ambiguity.  See:
231 # http://en.wikipedia.org/wiki/Megabyte
232 # Thanks.
233 MILLION_BYTES=10**6
234
235 def decode_from_files(outf, infiles, verbose=False):
236     """
237     Decode from the first k files in infiles, writing the results to outf.
238     """
239     assert len(infiles) >= 2
240     infs = []
241     shnums = []
242     m = None
243     k = None
244     padlen = None
245
246     byteswritten = 0
247     for f in infiles:
248         (nm, nk, npadlen, shnum,) = _parse_header(f)
249         if not (m is None or m == nm):
250             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that m was %s but another share file previously said that m was %s" % (f.name, nm, m,))
251         m = nm
252         if not (k is None or k == nk):
253             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that k was %s but another share file previously said that k was %s" % (f.name, nk, k,))
254         if k > len(infiles):
255             raise InsufficientShareFilesError(k, len(infiles))
256         k = nk
257         if not (padlen is None or padlen == npadlen):
258             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that pad length was %s but another share file previously said that pad length was %s" % (f.name, npadlen, padlen,))
259         padlen = npadlen
260
261         infs.append(f)
262         shnums.append(shnum)
263
264         if len(infs) == k:
265             break
266
267     dec = easyfec.Decoder(k, m)
268
269     while True:
270         chunks = [ inf.read(CHUNKSIZE) for inf in infs ]
271         if [ch for ch in chunks if len(ch) != len(chunks[-1])]:
272             raise CorruptedShareFilesError("Share files were corrupted -- all share files are required to be the same length, but they weren't.")
273
274         if len(chunks[-1]) == CHUNKSIZE:
275             # Then this was a full read, so we're still in the sharefiles.
276             resultdata = dec.decode(chunks, shnums, padlen=0)
277             outf.write(resultdata)
278             byteswritten += len(resultdata)
279             if verbose:
280                 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
281                     print str(byteswritten / MILLION_BYTES) + " MB ...",
282         else:
283             # Then this was a short read, so we've reached the end of the sharefiles.
284             resultdata = dec.decode(chunks, shnums, padlen)
285             outf.write(resultdata)
286             return # Done.
287     if verbose:
288         print
289         print "Done!"
290
291 def encode_file(inf, cb, k, m, chunksize=4096):
292     """
293     Read in the contents of inf, encode, and call cb with the results.
294
295     First, k "input blocks" will be read from inf, each input block being of 
296     size chunksize.  Then these k blocks will be encoded into m "result 
297     blocks".  Then cb will be invoked, passing a list of the m result blocks 
298     as its first argument, and the length of the encoded data as its second 
299     argument.  (The length of the encoded data is always equal to k*chunksize, 
300     until the last iteration, when the end of the file has been reached and 
301     less than k*chunksize bytes could be read from the file.)  This procedure 
302     is iterated until the end of the file is reached, in which case the space 
303     of the input blocks that is unused is filled with zeroes before encoding.
304
305     Note that the sequence passed in calls to cb() contains mutable array
306     objects in its first k elements whose contents will be overwritten when 
307     the next segment is read from the input file.  Therefore the 
308     implementation of cb() has to either be finished with those first k arrays 
309     before returning, or if it wants to keep the contents of those arrays for 
310     subsequent use after it has returned then it must make a copy of them to 
311     keep.
312
313     @param inf the file object from which to read the data
314     @param cb the callback to be invoked with the results
315     @param k the number of shares required to reconstruct the file
316     @param m the total number of shares created
317     @param chunksize how much data to read from inf for each of the k input 
318         blocks
319     """
320     enc = zfec.Encoder(k, m)
321     l = tuple([ array.array('c') for i in range(k) ])
322     indatasize = k*chunksize # will be reset to shorter upon EOF
323     eof = False
324     ZEROES=array.array('c', ['\x00'])*chunksize
325     while not eof:
326         # This loop body executes once per segment.
327         i = 0
328         while (i<len(l)):
329             # This loop body executes once per chunk.
330             a = l[i]
331             del a[:]
332             try:
333                 a.fromfile(inf, chunksize)
334                 i += 1
335             except EOFError:
336                 eof = True
337                 indatasize = i*chunksize + len(a)
338                 
339                 # padding
340                 a.fromstring("\x00" * (chunksize-len(a)))
341                 i += 1
342                 while (i<len(l)):
343                     a = l[i]
344                     a[:] = ZEROES
345                     i += 1
346
347         res = enc.encode(l)
348         cb(res, indatasize)
349
350 try:
351     from hashlib import sha1
352 except ImportError:
353     # hashlib was added in Python 2.5.0.
354     import sha as sha1
355
356 def encode_file_not_really(inf, cb, k, m, chunksize=4096):
357     enc = zfec.Encoder(k, m)
358     l = tuple([ array.array('c') for i in range(k) ])
359     indatasize = k*chunksize # will be reset to shorter upon EOF
360     eof = False
361     ZEROES=array.array('c', ['\x00'])*chunksize
362     while not eof:
363         # This loop body executes once per segment.
364         i = 0
365         while (i<len(l)):
366             # This loop body executes once per chunk.
367             a = l[i]
368             del a[:]
369             try:
370                 a.fromfile(inf, chunksize)
371                 i += 1
372             except EOFError:
373                 eof = True
374                 indatasize = i*chunksize + len(a)
375                 
376                 # padding
377                 a.fromstring("\x00" * (chunksize-len(a)))
378                 i += 1
379                 while (i<len(l)):
380                     a = l[i]
381                     a[:] = ZEROES
382                     i += 1
383
384         # res = enc.encode(l)
385         cb(None, None)
386
387 def encode_file_not_really_and_hash(inf, cb, k, m, chunksize=4096):
388     hasher = sha1.new()
389     enc = zfec.Encoder(k, m)
390     l = tuple([ array.array('c') for i in range(k) ])
391     indatasize = k*chunksize # will be reset to shorter upon EOF
392     eof = False
393     ZEROES=array.array('c', ['\x00'])*chunksize
394     while not eof:
395         # This loop body executes once per segment.
396         i = 0
397         while (i<len(l)):
398             # This loop body executes once per chunk.
399             a = l[i]
400             del a[:]
401             try:
402                 a.fromfile(inf, chunksize)
403                 i += 1
404             except EOFError:
405                 eof = True
406                 indatasize = i*chunksize + len(a)
407                 
408                 # padding
409                 a.fromstring("\x00" * (chunksize-len(a)))
410                 i += 1
411                 while (i<len(l)):
412                     a = l[i]
413                     a[:] = ZEROES
414                     i += 1
415
416         # res = enc.encode(l)
417         for thing in l:
418             hasher.update(thing)
419         cb(None, None)
420
421 def encode_file_stringy(inf, cb, k, m, chunksize=4096):
422     """
423     Read in the contents of inf, encode, and call cb with the results.
424
425     First, k "input blocks" will be read from inf, each input block being of 
426     size chunksize.  Then these k blocks will be encoded into m "result 
427     blocks".  Then cb will be invoked, passing a list of the m result blocks 
428     as its first argument, and the length of the encoded data as its second 
429     argument.  (The length of the encoded data is always equal to k*chunksize, 
430     until the last iteration, when the end of the file has been reached and 
431     less than k*chunksize bytes could be read from the file.)  This procedure 
432     is iterated until the end of the file is reached, in which case the part 
433     of the input shares that is unused is filled with zeroes before encoding.
434
435     @param inf the file object from which to read the data
436     @param cb the callback to be invoked with the results
437     @param k the number of shares required to reconstruct the file
438     @param m the total number of shares created
439     @param chunksize how much data to read from inf for each of the k input 
440         blocks
441     """
442     enc = zfec.Encoder(k, m)
443     indatasize = k*chunksize # will be reset to shorter upon EOF
444     while indatasize == k*chunksize:
445         # This loop body executes once per segment.
446         i = 0
447         l = []
448         ZEROES = '\x00'*chunksize
449         while i<k:
450             # This loop body executes once per chunk.
451             i += 1
452             l.append(inf.read(chunksize))
453             if len(l[-1]) < chunksize:
454                 indatasize = i*chunksize + len(l[-1])
455                 
456                 # padding
457                 l[-1] = l[-1] + "\x00" * (chunksize-len(l[-1]))
458                 while i<k:
459                     l.append(ZEROES)
460                     i += 1
461
462         res = enc.encode(l)
463         cb(res, indatasize)
464
465 def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
466     """
467     Read in the contents of inf, encode, and call cb with the results.
468
469     First, chunksize*k bytes will be read from inf, then encoded into m
470     "result blocks".  Then cb will be invoked, passing a list of the m result
471     blocks as its first argument, and the length of the encoded data as its
472     second argument.  (The length of the encoded data is always equal to
473     k*chunksize, until the last iteration, when the end of the file has been
474     reached and less than k*chunksize bytes could be read from the file.)
475     This procedure is iterated until the end of the file is reached, in which
476     case the space of the input that is unused is filled with zeroes before
477     encoding.
478
479     @param inf the file object from which to read the data
480     @param cb the callback to be invoked with the results
481     @param k the number of shares required to reconstruct the file
482     @param m the total number of shares created
483     @param chunksize how much data to read from inf for each of the k input 
484         blocks
485     """
486     enc = easyfec.Encoder(k, m)
487
488     readsize = k*chunksize
489     indata = inf.read(readsize)
490     while indata:
491         res = enc.encode(indata)
492         cb(res, len(indata))
493         indata = inf.read(readsize)
494
495 # zfec -- fast forward error correction library with Python interface
496
497 # Copyright (C) 2007 Allmydata, Inc.
498 # Author: Zooko Wilcox-O'Hearn
499
500 # This file is part of zfec.
501 #
502 # See README.txt for licensing information.