]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - zfec/zfec/filefec.py
trivial: hush pyflakes
[tahoe-lafs/zfec.git] / zfec / zfec / filefec.py
1 import easyfec, zfec
2 from pyutil import fileutil
3 from pyutil.mathutil import pad_size, log_ceil
4
5 import array, os, struct
6
7 CHUNKSIZE = 4096
8
9 from base64 import b32encode
10 def ab(x): # debuggery
11     if len(x) >= 3:
12         return "%s:%s" % (len(x), b32encode(x[-3:]),)
13     elif len(x) == 2:
14         return "%s:%s" % (len(x), b32encode(x[-2:]),)
15     elif len(x) == 1:
16         return "%s:%s" % (len(x), b32encode(x[-1:]),)
17     elif len(x) == 0:
18         return "%s:%s" % (len(x), "--empty--",)
19
20 class InsufficientShareFilesError(zfec.Error):
21     def __init__(self, k, kb, *args, **kwargs):
22         zfec.Error.__init__(self, *args, **kwargs)
23         self.k = k
24         self.kb = kb
25
26     def __repr__(self):
27         return "Insufficient share files -- %d share files are required to recover this file, but only %d were given" % (self.k, self.kb,)
28
29     def __str__(self):
30         return self.__repr__()
31
32 class CorruptedShareFilesError(zfec.Error):
33     pass
34
35 def _build_header(m, k, pad, sh):
36     """
37     @param m: the total number of shares; 1 <= m <= 256
38     @param k: the number of shares required to reconstruct; 1 <= k <= m
39     @param pad: the number of bytes of padding added to the file before encoding; 0 <= pad < k
40     @param sh: the shnum of this share; 0 <= k < m
41
42     @return: a compressed string encoding m, k, pad, and sh
43     """
44     assert m >= 1
45     assert m <= 2**8
46     assert k >= 1
47     assert k <= m
48     assert pad >= 0
49     assert pad < k
50
51     assert sh >= 0
52     assert sh < m
53
54     bitsused = 0
55     val = 0
56
57     val |= (m - 1)
58     bitsused += 8 # the first 8 bits always encode m
59
60     kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
61     val <<= kbits
62     bitsused += kbits
63
64     val |= (k - 1)
65
66     padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
67     val <<= padbits
68     bitsused += padbits
69
70     val |= pad
71
72     shnumbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
73     val <<= shnumbits
74     bitsused += shnumbits
75
76     val |= sh
77
78     assert bitsused >= 8, bitsused
79     assert bitsused <= 32, bitsused
80
81     if bitsused <= 16:
82         val <<= (16-bitsused)
83         cs = struct.pack('>H', val)
84         assert cs[:-2] == '\x00' * (len(cs)-2)
85         return cs[-2:]
86     if bitsused <= 24:
87         val <<= (24-bitsused)
88         cs = struct.pack('>I', val)
89         assert cs[:-3] == '\x00' * (len(cs)-3)
90         return cs[-3:]
91     else:
92         val <<= (32-bitsused)
93         cs = struct.pack('>I', val)
94         assert cs[:-4] == '\x00' * (len(cs)-4)
95         return cs[-4:]
96
97 def MASK(bits):
98     return (1<<bits)-1
99
100 def _parse_header(inf):
101     """
102     @param inf: an object which I can call read(1) on to get another byte
103
104     @return: tuple of (m, k, pad, sh,); side-effect: the first one to four
105         bytes of inf will be read
106     """
107     # The first 8 bits always encode m.
108     ch = inf.read(1)
109     if not ch:
110         raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
111     byte = ord(ch)
112     m = byte + 1
113
114     # The next few bits encode k.
115     kbits = log_ceil(m, 2) # num bits needed to store all possible values of k
116     b2_bits_left = 8-kbits
117     kbitmask = MASK(kbits) << b2_bits_left
118     ch = inf.read(1)
119     if not ch:
120         raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
121     byte = ord(ch)
122     k = ((byte & kbitmask) >> b2_bits_left) + 1
123
124     shbits = log_ceil(m, 2) # num bits needed to store all possible values of shnum
125     padbits = log_ceil(k, 2) # num bits needed to store all possible values of pad
126
127     val = byte & (~kbitmask)
128
129     needed_padbits = padbits - b2_bits_left
130     if needed_padbits > 0:
131         ch = inf.read(1)
132         if not ch:
133             raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
134         byte = struct.unpack(">B", ch)[0]
135         val <<= 8
136         val |= byte 
137         needed_padbits -= 8
138     assert needed_padbits <= 0
139     extrabits = -needed_padbits
140     pad = val >> extrabits
141     val &= MASK(extrabits)
142
143     needed_shbits = shbits - extrabits
144     if needed_shbits > 0:
145         ch = inf.read(1)
146         if not ch:
147             raise CorruptedShareFilesError("Share files were corrupted -- share file %r didn't have a complete metadata header at the front.  Perhaps the file was truncated." % (inf.name,))
148         byte = struct.unpack(">B", ch)[0]
149         val <<= 8
150         val |= byte 
151         needed_shbits -= 8
152     assert needed_shbits <= 0
153
154     gotshbits = -needed_shbits
155
156     sh = val >> gotshbits
157
158     return (m, k, pad, sh,)
159
160 FORMAT_FORMAT = "%%s.%%0%dd_%%0%dd%%s"
161 RE_FORMAT = "%s.[0-9]+_[0-9]+%s"
162 def encode_to_files(inf, fsize, dirname, prefix, k, m, suffix=".fec", overwrite=False, verbose=False):
163     """
164     Encode inf, writing the shares to specially named, newly created files.
165
166     @param fsize: calling read() on inf must yield fsize bytes of data and 
167         then raise an EOFError
168     @param dirname: the name of the directory into which the sharefiles will
169         be written
170     """
171     mlen = len(str(m))
172     format = FORMAT_FORMAT % (mlen, mlen,)
173
174     padbytes = pad_size(fsize, k)
175
176     fns = []
177     fs = []
178     try:
179         for shnum in range(m):
180             hdr = _build_header(m, k, padbytes, shnum)
181
182             fn = os.path.join(dirname, format % (prefix, shnum, m, suffix,))
183             if verbose:
184                 print "Creating share file %r..." % (fn,)
185             if overwrite:
186                 f = open(fn, "wb")
187             else:
188                 flags = os.O_WRONLY|os.O_CREAT|os.O_EXCL | (hasattr(os, 'O_BINARY') and os.O_BINARY)
189                 fd = os.open(fn, flags)
190                 f = os.fdopen(fd, "wb")
191             f.write(hdr)
192             fs.append(f)
193             fns.append(fn)
194         sumlen = [0]
195         def cb(blocks, length):
196             assert len(blocks) == len(fs)
197             oldsumlen = sumlen[0]
198             sumlen[0] += length
199             if verbose:
200                 if int((float(oldsumlen) / fsize) * 10) != int((float(sumlen[0]) / fsize) * 10):
201                     print str(int((float(sumlen[0]) / fsize) * 10) * 10) + "% ...",
202             
203             if sumlen[0] > fsize:
204                 raise IOError("Wrong file size -- possibly the size of the file changed during encoding.  Original size: %d, observed size at least: %s" % (fsize, sumlen[0],))
205             for i in range(len(blocks)):
206                 data = blocks[i]
207                 fs[i].write(data)
208                 length -= len(data)
209
210         encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096)
211     except EnvironmentError, le:
212         print "Cannot complete because of exception: "
213         print le
214         print "Cleaning up..."
215         # clean up
216         while fs:
217             f = fs.pop()
218             f.close() ; del f
219             fn = fns.pop()
220             if verbose:
221                 print "Cleaning up: trying to remove %r..." % (fn,)
222             fileutil.remove_if_possible(fn)
223         return 1
224     if verbose:
225         print 
226         print "Done!"
227     return 0
228
229 # Note: if you really prefer base-2 and you change this code, then please
230 # denote 2^20 as "MiB" instead of "MB" in order to avoid ambiguity.  See:
231 # http://en.wikipedia.org/wiki/Megabyte
232 # Thanks.
233 MILLION_BYTES=10**6
234
235 def decode_from_files(outf, infiles, verbose=False):
236     """
237     Decode from the first k files in infiles, writing the results to outf.
238     """
239     assert len(infiles) >= 2
240     infs = []
241     shnums = []
242     m = None
243     k = None
244     padlen = None
245
246     byteswritten = 0
247     for f in infiles:
248         (nm, nk, npadlen, shnum,) = _parse_header(f)
249         if not (m is None or m == nm):
250             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that m was %s but another share file previously said that m was %s" % (f.name, nm, m,))
251         m = nm
252         if not (k is None or k == nk):
253             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that k was %s but another share file previously said that k was %s" % (f.name, nk, k,))
254         if k > len(infiles):
255             raise InsufficientShareFilesError(k, len(infiles))
256         k = nk
257         if not (padlen is None or padlen == npadlen):
258             raise CorruptedShareFilesError("Share files were corrupted -- share file %r said that pad length was %s but another share file previously said that pad length was %s" % (f.name, npadlen, padlen,))
259         padlen = npadlen
260
261         infs.append(f)
262         shnums.append(shnum)
263
264         if len(infs) == k:
265             break
266
267     dec = easyfec.Decoder(k, m)
268
269     while True:
270         chunks = [ inf.read(CHUNKSIZE) for inf in infs ]
271         if [ch for ch in chunks if len(ch) != len(chunks[-1])]:
272             raise CorruptedShareFilesError("Share files were corrupted -- all share files are required to be the same length, but they weren't.")
273
274         if len(chunks[-1]) == CHUNKSIZE:
275             # Then this was a full read, so we're still in the sharefiles.
276             resultdata = dec.decode(chunks, shnums, padlen=0)
277             outf.write(resultdata)
278             byteswritten += len(resultdata)
279             if verbose:
280                 if ((byteswritten - len(resultdata)) / (10*MILLION_BYTES)) != (byteswritten / (10*MILLION_BYTES)):
281                     print str(byteswritten / MILLION_BYTES) + " MB ...",
282         else:
283             # Then this was a short read, so we've reached the end of the sharefiles.
284             resultdata = dec.decode(chunks, shnums, padlen)
285             outf.write(resultdata)
286             return # Done.
287     if verbose:
288         print
289         print "Done!"
290
291 def encode_file(inf, cb, k, m, chunksize=4096):
292     """
293     Read in the contents of inf, encode, and call cb with the results.
294
295     First, k "input blocks" will be read from inf, each input block being of 
296     size chunksize.  Then these k blocks will be encoded into m "result 
297     blocks".  Then cb will be invoked, passing a list of the m result blocks 
298     as its first argument, and the length of the encoded data as its second 
299     argument.  (The length of the encoded data is always equal to k*chunksize, 
300     until the last iteration, when the end of the file has been reached and 
301     less than k*chunksize bytes could be read from the file.)  This procedure 
302     is iterated until the end of the file is reached, in which case the space 
303     of the input blocks that is unused is filled with zeroes before encoding.
304
305     Note that the sequence passed in calls to cb() contains mutable array
306     objects in its first k elements whose contents will be overwritten when 
307     the next segment is read from the input file.  Therefore the 
308     implementation of cb() has to either be finished with those first k arrays 
309     before returning, or if it wants to keep the contents of those arrays for 
310     subsequent use after it has returned then it must make a copy of them to 
311     keep.
312
313     @param inf the file object from which to read the data
314     @param cb the callback to be invoked with the results
315     @param k the number of shares required to reconstruct the file
316     @param m the total number of shares created
317     @param chunksize how much data to read from inf for each of the k input 
318         blocks
319     """
320     enc = zfec.Encoder(k, m)
321     l = tuple([ array.array('c') for i in range(k) ])
322     indatasize = k*chunksize # will be reset to shorter upon EOF
323     eof = False
324     ZEROES=array.array('c', ['\x00'])*chunksize
325     while not eof:
326         # This loop body executes once per segment.
327         i = 0
328         while (i<len(l)):
329             # This loop body executes once per chunk.
330             a = l[i]
331             del a[:]
332             try:
333                 a.fromfile(inf, chunksize)
334                 i += 1
335             except EOFError:
336                 eof = True
337                 indatasize = i*chunksize + len(a)
338                 
339                 # padding
340                 a.fromstring("\x00" * (chunksize-len(a)))
341                 i += 1
342                 while (i<len(l)):
343                     a = l[i]
344                     a[:] = ZEROES
345                     i += 1
346
347         res = enc.encode(l)
348         cb(res, indatasize)
349
350 try:
351     from hashlib import sha1
352     sha1 = sha1 # hush pyflakes
353 except ImportError:
354     # hashlib was added in Python 2.5.0.
355     import sha
356     sha1 = sha
357
358 def encode_file_not_really(inf, cb, k, m, chunksize=4096):
359     enc = zfec.Encoder(k, m)
360     l = tuple([ array.array('c') for i in range(k) ])
361     indatasize = k*chunksize # will be reset to shorter upon EOF
362     eof = False
363     ZEROES=array.array('c', ['\x00'])*chunksize
364     while not eof:
365         # This loop body executes once per segment.
366         i = 0
367         while (i<len(l)):
368             # This loop body executes once per chunk.
369             a = l[i]
370             del a[:]
371             try:
372                 a.fromfile(inf, chunksize)
373                 i += 1
374             except EOFError:
375                 eof = True
376                 indatasize = i*chunksize + len(a)
377                 
378                 # padding
379                 a.fromstring("\x00" * (chunksize-len(a)))
380                 i += 1
381                 while (i<len(l)):
382                     a = l[i]
383                     a[:] = ZEROES
384                     i += 1
385
386         # res = enc.encode(l)
387         cb(None, None)
388
389 def encode_file_not_really_and_hash(inf, cb, k, m, chunksize=4096):
390     hasher = sha1.new()
391     enc = zfec.Encoder(k, m)
392     l = tuple([ array.array('c') for i in range(k) ])
393     indatasize = k*chunksize # will be reset to shorter upon EOF
394     eof = False
395     ZEROES=array.array('c', ['\x00'])*chunksize
396     while not eof:
397         # This loop body executes once per segment.
398         i = 0
399         while (i<len(l)):
400             # This loop body executes once per chunk.
401             a = l[i]
402             del a[:]
403             try:
404                 a.fromfile(inf, chunksize)
405                 i += 1
406             except EOFError:
407                 eof = True
408                 indatasize = i*chunksize + len(a)
409                 
410                 # padding
411                 a.fromstring("\x00" * (chunksize-len(a)))
412                 i += 1
413                 while (i<len(l)):
414                     a = l[i]
415                     a[:] = ZEROES
416                     i += 1
417
418         # res = enc.encode(l)
419         for thing in l:
420             hasher.update(thing)
421         cb(None, None)
422
423 def encode_file_stringy(inf, cb, k, m, chunksize=4096):
424     """
425     Read in the contents of inf, encode, and call cb with the results.
426
427     First, k "input blocks" will be read from inf, each input block being of 
428     size chunksize.  Then these k blocks will be encoded into m "result 
429     blocks".  Then cb will be invoked, passing a list of the m result blocks 
430     as its first argument, and the length of the encoded data as its second 
431     argument.  (The length of the encoded data is always equal to k*chunksize, 
432     until the last iteration, when the end of the file has been reached and 
433     less than k*chunksize bytes could be read from the file.)  This procedure 
434     is iterated until the end of the file is reached, in which case the part 
435     of the input shares that is unused is filled with zeroes before encoding.
436
437     @param inf the file object from which to read the data
438     @param cb the callback to be invoked with the results
439     @param k the number of shares required to reconstruct the file
440     @param m the total number of shares created
441     @param chunksize how much data to read from inf for each of the k input 
442         blocks
443     """
444     enc = zfec.Encoder(k, m)
445     indatasize = k*chunksize # will be reset to shorter upon EOF
446     while indatasize == k*chunksize:
447         # This loop body executes once per segment.
448         i = 0
449         l = []
450         ZEROES = '\x00'*chunksize
451         while i<k:
452             # This loop body executes once per chunk.
453             i += 1
454             l.append(inf.read(chunksize))
455             if len(l[-1]) < chunksize:
456                 indatasize = i*chunksize + len(l[-1])
457                 
458                 # padding
459                 l[-1] = l[-1] + "\x00" * (chunksize-len(l[-1]))
460                 while i<k:
461                     l.append(ZEROES)
462                     i += 1
463
464         res = enc.encode(l)
465         cb(res, indatasize)
466
467 def encode_file_stringy_easyfec(inf, cb, k, m, chunksize=4096):
468     """
469     Read in the contents of inf, encode, and call cb with the results.
470
471     First, chunksize*k bytes will be read from inf, then encoded into m
472     "result blocks".  Then cb will be invoked, passing a list of the m result
473     blocks as its first argument, and the length of the encoded data as its
474     second argument.  (The length of the encoded data is always equal to
475     k*chunksize, until the last iteration, when the end of the file has been
476     reached and less than k*chunksize bytes could be read from the file.)
477     This procedure is iterated until the end of the file is reached, in which
478     case the space of the input that is unused is filled with zeroes before
479     encoding.
480
481     @param inf the file object from which to read the data
482     @param cb the callback to be invoked with the results
483     @param k the number of shares required to reconstruct the file
484     @param m the total number of shares created
485     @param chunksize how much data to read from inf for each of the k input 
486         blocks
487     """
488     enc = easyfec.Encoder(k, m)
489
490     readsize = k*chunksize
491     indata = inf.read(readsize)
492     while indata:
493         res = enc.encode(indata)
494         cb(res, len(indata))
495         indata = inf.read(readsize)
496
497 # zfec -- fast forward error correction library with Python interface
498
499 # Copyright (C) 2007 Allmydata, Inc.
500 # Author: Zooko Wilcox-O'Hearn
501
502 # This file is part of zfec.
503 #
504 # See README.txt for licensing information.