From: Brian Warner Date: Fri, 5 Jan 2007 04:52:51 +0000 (-0700) Subject: establish IEncoder/IDecoder, create suitable interfaces for both the simple replicati... X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~402 X-Git-Url: https://git.rkrishnan.org/about.html?a=commitdiff_plain;h=f31fc06d89d7f4eef2ecd7b65a9eb69bdaf04267;p=tahoe-lafs%2Ftahoe-lafs.git establish IEncoder/IDecoder, create suitable interfaces for both the simple replicating encoder and the py_ecc one, add a (failing) unit test for it --- diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index ea54fbd2..28775ebc 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -1,10 +1,49 @@ +# -*- test-case-name: allmydata.test.test_encode_share -*- + +from zope.interface import implements from twisted.internet import defer import sha -from allmydata.util import idlib +from allmydata.util import idlib, mathutil +from allmydata.interfaces import IEncoder, IDecoder +from allmydata.py_ecc import rs_code def netstring(s): return "%d:%s," % (len(s), s) +class ReplicatingEncoder(object): + implements(IEncoder) + ENCODER_TYPE = 0 + + def set_params(self, data_size, required_shares, total_shares): + self.data_size = data_size + self.required_shares = required_shares + self.total_shares = total_shares + + def get_encoder_type(self): + return self.ENCODER_TYPE + + def get_serialized_params(self): + return "%d" % self.required_shares + + def get_share_size(self): + return self.data_size + + def encode(self, data): + shares = [(i,data) for i in range(self.total_shares)] + return defer.succeed(shares) + +class ReplicatingDecoder(object): + implements(IDecoder) + + def set_serialized_params(self, params): + self.required_shares = int(params) + + def decode(self, some_shares): + assert len(some_shares) >= self.required_shares + data = some_shares[0][1] + return defer.succeed(data) + + class Encoder(object): def __init__(self, infile, m): self.infile = infile @@ -48,3 +87,99 @@ class Decoder(object): if self._verifierid: assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid)) + +class PyRSEncoder(object): + ENCODER_TYPE = 1 + + # we will break the data into vectors in which each element is a single + # byte (i.e. a single number from 0 to 255), and the length of the vector + # is equal to the number of required_shares. We use padding to make the + # last chunk of data long enough to match, and we record the data_size in + # the serialized parameters to strip this padding out on the receiving + # end. + + def set_params(self, data_size, required_shares, total_shares): + self.data_size = data_size + self.required_shares = required_shares + self.total_shares = total_shares + self.chunk_size = required_shares + self.num_chunks = mathutil.div_ceil(data_size, self.chunk_size) + self.last_chunk_padding = mathutil.pad_size(data_size, required_shares) + self.share_size = self.num_chunks + self.encoder = rs_code.RSCode(total_shares, required_shares) + + def get_encoder_type(self): + return self.ENCODER_TYPE + + def get_serialized_params(self): + return "%d:%d:%d" % (self.data_size, self.required_shares, + self.total_shares) + + def get_share_size(self): + return self.share_size + + def encode(self, data): + share_data = [ [] for i in range(self.total_shares)] + for i in range(self.num_chunks): + offset = i*self.chunk_size + chunk = data[offset:offset+self.chunk_size] + if i == self.num_chunks-1: + chunk = chunk + "\x00"*self.last_chunk_padding + assert len(chunk) == self.chunk_size + input_vector = [ord(x) for x in chunk] + output_vector = self.encoder.Encode(input_vector) + assert len(output_vector) == self.total_shares + for i2,out in enumerate(output_vector): + out_chars = [chr(x) for x in out] + out_string = "".join(out_chars) + share_data[i2].append(out_string) + + shares = [ (i, "".join(share_data[i])) + for i in range(self.total_shares) ] + return defer.succeed(shares) + +class PyRSDecoder(object): + + def set_serialized_params(self, params): + pieces = params.split(":") + self.data_size = int(pieces[0]) + self.required_shares = int(pieces[1]) + self.total_shares = int(pieces[2]) + + self.chunk_size = self.required_shares + self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size) + self.last_chunk_padding = mathutil.pad_size(self.data_size, + self.required_shares) + self.share_size = self.num_chunks + self.encoder = rs_code.RSCode(self.total_shares, self.required_shares) + + def decode(self, some_shares): + chunk_size = self.chunk_size + assert len(some_shares) >= self.required_shares + chunks = [ [] for i in range(self.num_chunks) ] + have_shares = {} + for share_num, share_data in some_shares: + have_shares[share_num] = share_data + for i in range(self.num_chunks): + offset = i*chunk_size + received_vector = [] + for j in range(self.total_shares): + share = have_shares.get(j) + if share is not None: + v1 = [ord(x) for x in share[offset:offset+chunk_size]] + received_vector.append(v1) + else: + received_vector.append(None) + decoded_vector = self.encoder.DecodeImmediate(received_vector) + if i == self.num_chunks-1: + decoded_vector = decoded_vector[:-self.last_chunk_padding] + chunk = "".join([chr(x) for x in decoded_vector]) + chunks.append(chunk) + data = "".join(chunks) + return defer.succeed(data) + + +all_encoders = { + ReplicatingEncoder.ENCODER_TYPE: (ReplicatingEncoder, ReplicatingDecoder), + PyRSEncoder.ENCODER_TYPE: (PyRSEncoder, PyRSDecoder), + } diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 14d081e3..2bea869f 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1,4 +1,5 @@ +from zope.interface import Interface from foolscap.schema import StringConstraint, ListOf, TupleOf, Any, Nothing from foolscap import RemoteInterface @@ -69,7 +70,78 @@ class RIMutableDirectoryNode(RemoteInterface): # need more to move directories -# TODO: figleaf gets confused when the last line of a file is a comment. I -# suspect an off-by-one error in the code that decides which lines are code -# and which are not. -pass + +class IEncoder(Interface): + def set_params(data_size, required_shares, total_shares): + """Set up the parameters of this encoder. + + See encode() for a description of how these parameters are used. + """ + + def get_encoder_type(): + """Return an integer that describes the type of this encoder. + + There must be a global table of encoder classes. This method returns + an index into this table; the value at this index is an encoder + class, and this encoder is an instance of that class. + """ + + def get_serialized_params(): # TODO: maybe, maybe not + """Return a string that describes the parameters of this encoder. + + This string can be passed to the decoder to prepare it for handling + the encoded shares we create. It might contain more information than + was presented to set_params(), if there is some flexibility of + parameter choice. + + This string is intended to be embedded in the URI, so there are + several restrictions on its contents. At the moment I'm thinking that + this means it may contain hex digits and colons, and nothing else. + The idea is that the URI contains '%d:%s.' % + (encoder.get_encoder_type(), encoder.get_serialized_params()), and + this is enough information to construct a compatible decoder. + """ + + def get_share_size(): + """Return the length of the shares that encode() will produce. + """ + + def encode(data): + """Encode a chunk of data. This may be called multiple times. Each + call is independent. + + The data must be a string with a length that exactly matches the + data_size promised by set_params(). + + For each call, encode() will return a Deferred that fires with a list + of 'total_shares' tuples. Each tuple is of the form (sharenum, + share), where sharenum is an int (from 0 total_shares-1), and share + is a string. The get_share_size() method can be used to determine the + length of the 'share' strings returned by encode(). + + The memory usage of this function is expected to be on the order of + total_shares * get_share_size(). + """ + +class IDecoder(Interface): + def set_serialized_params(params): + """Set up the parameters of this encoder, from a string returned by + encoder.get_serialized_params().""" + + def decode(some_shares): + """Decode a partial list of shares into data. + + 'some_shares' must be a list of (sharenum, share) tuples, a subset of + the shares returned by IEncoder.encode(). Each share must be of the + same length. The share tuples may appear in any order, but of course + each tuple must have a sharenum that correctly matches the associated + share data string. + + This returns a Deferred which fires with a string. This string will + always have a length equal to the 'data_size' value passed into the + original IEncoder.set_params() call. + + The length of 'some_shares' must be equal or greater than the value + of 'required_shares' passed into the original IEncoder.set_params() + call. + """ diff --git a/src/allmydata/test/test_encode_share.py b/src/allmydata/test/test_encode_share.py new file mode 100644 index 00000000..a8516f62 --- /dev/null +++ b/src/allmydata/test/test_encode_share.py @@ -0,0 +1,92 @@ + +import os +from twisted.trial import unittest +from twisted.internet import defer +from allmydata.encode import PyRSEncoder, PyRSDecoder, ReplicatingEncoder, ReplicatingDecoder +import random + +class Tester: + #enc_class = PyRSEncoder + #dec_class = PyRSDecoder + + def do_test(self, size, required_shares, total_shares): + data0 = os.urandom(size) + enc = self.enc_class() + enc.set_params(size, required_shares, total_shares) + serialized_params = enc.get_serialized_params() + d = enc.encode(data0) + def _done(shares): + self.failUnlessEqual(len(shares), total_shares) + self.shares = shares + d.addCallback(_done) + + def _decode(shares): + dec = self.dec_class() + dec.set_serialized_params(serialized_params) + d1 = dec.decode(shares) + return d1 + + def _check_data(data1): + self.failUnlessEqual(len(data1), len(data0)) + self.failUnless(data1 == data0) + + def _decode_all_ordered(res): + # can we decode using all of the shares? + return _decode(self.shares) + d.addCallback(_decode_all_ordered) + d.addCallback(_check_data) + + def _decode_all_shuffled(res): + # can we decode, using all the shares, but in random order? + shuffled_shares = self.shares[:] + random.shuffle(shuffled_shares) + return _decode(shuffled_shares) + d.addCallback(_decode_all_shuffled) + d.addCallback(_check_data) + + def _decode_some(res): + # decode with a minimal subset of the shares + some_shares = self.shares[:required_shares] + return _decode(some_shares) + d.addCallback(_decode_some) + d.addCallback(_check_data) + + def _decode_some_random(res): + # use a randomly-selected minimal subset + some_shares = random.sample(self.shares, required_shares) + return _decode(some_shares) + d.addCallback(_decode_some_random) + d.addCallback(_check_data) + + def _decode_multiple(res): + # make sure we can re-use the decoder object + shares1 = random.sample(self.shares, required_shares) + shares2 = random.sample(self.shares, required_shares) + dec = self.dec_class() + dec.set_serialized_params(serialized_params) + d1 = dec.decode(shares1) + d1.addCallback(_check_data) + d1.addCallback(lambda res: dec.decode(shares2)) + d1.addCallback(_check_data) + return d1 + d.addCallback(_decode_multiple) + + return d + + def test_encode(self): + return self.do_test(1000, 25, 100) + + def test_sizes(self): + d = defer.succeed(None) + for i in range(1, 100): + d.addCallback(lambda res,size: self.do_test(size, 4, 10), i) + return d + +class PyRS(unittest.TestCase, Tester): + enc_class = PyRSEncoder + dec_class = PyRSDecoder + + +class Replicating(unittest.TestCase, Tester): + enc_class = ReplicatingEncoder + dec_class = ReplicatingDecoder