+# -*- test-case-name: allmydata.test.test_encode_share -*-
+
+from zope.interface import implements
from twisted.internet import defer
import sha
-from allmydata.util import idlib
+from allmydata.util import idlib, mathutil
+from allmydata.interfaces import IEncoder, IDecoder
+from allmydata.py_ecc import rs_code
def netstring(s):
return "%d:%s," % (len(s), s)
+class ReplicatingEncoder(object):
+ implements(IEncoder)
+ ENCODER_TYPE = 0
+
+ def set_params(self, data_size, required_shares, total_shares):
+ self.data_size = data_size
+ self.required_shares = required_shares
+ self.total_shares = total_shares
+
+ def get_encoder_type(self):
+ return self.ENCODER_TYPE
+
+ def get_serialized_params(self):
+ return "%d" % self.required_shares
+
+ def get_share_size(self):
+ return self.data_size
+
+ def encode(self, data):
+ shares = [(i,data) for i in range(self.total_shares)]
+ return defer.succeed(shares)
+
+class ReplicatingDecoder(object):
+ implements(IDecoder)
+
+ def set_serialized_params(self, params):
+ self.required_shares = int(params)
+
+ def decode(self, some_shares):
+ assert len(some_shares) >= self.required_shares
+ data = some_shares[0][1]
+ return defer.succeed(data)
+
+
class Encoder(object):
def __init__(self, infile, m):
self.infile = infile
if self._verifierid:
assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid))
+
+class PyRSEncoder(object):
+ ENCODER_TYPE = 1
+
+ # we will break the data into vectors in which each element is a single
+ # byte (i.e. a single number from 0 to 255), and the length of the vector
+ # is equal to the number of required_shares. We use padding to make the
+ # last chunk of data long enough to match, and we record the data_size in
+ # the serialized parameters to strip this padding out on the receiving
+ # end.
+
+ def set_params(self, data_size, required_shares, total_shares):
+ self.data_size = data_size
+ self.required_shares = required_shares
+ self.total_shares = total_shares
+ self.chunk_size = required_shares
+ self.num_chunks = mathutil.div_ceil(data_size, self.chunk_size)
+ self.last_chunk_padding = mathutil.pad_size(data_size, required_shares)
+ self.share_size = self.num_chunks
+ self.encoder = rs_code.RSCode(total_shares, required_shares)
+
+ def get_encoder_type(self):
+ return self.ENCODER_TYPE
+
+ def get_serialized_params(self):
+ return "%d:%d:%d" % (self.data_size, self.required_shares,
+ self.total_shares)
+
+ def get_share_size(self):
+ return self.share_size
+
+ def encode(self, data):
+ share_data = [ [] for i in range(self.total_shares)]
+ for i in range(self.num_chunks):
+ offset = i*self.chunk_size
+ chunk = data[offset:offset+self.chunk_size]
+ if i == self.num_chunks-1:
+ chunk = chunk + "\x00"*self.last_chunk_padding
+ assert len(chunk) == self.chunk_size
+ input_vector = [ord(x) for x in chunk]
+ output_vector = self.encoder.Encode(input_vector)
+ assert len(output_vector) == self.total_shares
+ for i2,out in enumerate(output_vector):
+ out_chars = [chr(x) for x in out]
+ out_string = "".join(out_chars)
+ share_data[i2].append(out_string)
+
+ shares = [ (i, "".join(share_data[i]))
+ for i in range(self.total_shares) ]
+ return defer.succeed(shares)
+
+class PyRSDecoder(object):
+
+ def set_serialized_params(self, params):
+ pieces = params.split(":")
+ self.data_size = int(pieces[0])
+ self.required_shares = int(pieces[1])
+ self.total_shares = int(pieces[2])
+
+ self.chunk_size = self.required_shares
+ self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size)
+ self.last_chunk_padding = mathutil.pad_size(self.data_size,
+ self.required_shares)
+ self.share_size = self.num_chunks
+ self.encoder = rs_code.RSCode(self.total_shares, self.required_shares)
+
+ def decode(self, some_shares):
+ chunk_size = self.chunk_size
+ assert len(some_shares) >= self.required_shares
+ chunks = [ [] for i in range(self.num_chunks) ]
+ have_shares = {}
+ for share_num, share_data in some_shares:
+ have_shares[share_num] = share_data
+ for i in range(self.num_chunks):
+ offset = i*chunk_size
+ received_vector = []
+ for j in range(self.total_shares):
+ share = have_shares.get(j)
+ if share is not None:
+ v1 = [ord(x) for x in share[offset:offset+chunk_size]]
+ received_vector.append(v1)
+ else:
+ received_vector.append(None)
+ decoded_vector = self.encoder.DecodeImmediate(received_vector)
+ if i == self.num_chunks-1:
+ decoded_vector = decoded_vector[:-self.last_chunk_padding]
+ chunk = "".join([chr(x) for x in decoded_vector])
+ chunks.append(chunk)
+ data = "".join(chunks)
+ return defer.succeed(data)
+
+
+all_encoders = {
+ ReplicatingEncoder.ENCODER_TYPE: (ReplicatingEncoder, ReplicatingDecoder),
+ PyRSEncoder.ENCODER_TYPE: (PyRSEncoder, PyRSDecoder),
+ }
+from zope.interface import Interface
from foolscap.schema import StringConstraint, ListOf, TupleOf, Any, Nothing
from foolscap import RemoteInterface
# need more to move directories
-# TODO: figleaf gets confused when the last line of a file is a comment. I
-# suspect an off-by-one error in the code that decides which lines are code
-# and which are not.
-pass
+
+class IEncoder(Interface):
+ def set_params(data_size, required_shares, total_shares):
+ """Set up the parameters of this encoder.
+
+ See encode() for a description of how these parameters are used.
+ """
+
+ def get_encoder_type():
+ """Return an integer that describes the type of this encoder.
+
+ There must be a global table of encoder classes. This method returns
+ an index into this table; the value at this index is an encoder
+ class, and this encoder is an instance of that class.
+ """
+
+ def get_serialized_params(): # TODO: maybe, maybe not
+ """Return a string that describes the parameters of this encoder.
+
+ This string can be passed to the decoder to prepare it for handling
+ the encoded shares we create. It might contain more information than
+ was presented to set_params(), if there is some flexibility of
+ parameter choice.
+
+ This string is intended to be embedded in the URI, so there are
+ several restrictions on its contents. At the moment I'm thinking that
+ this means it may contain hex digits and colons, and nothing else.
+ The idea is that the URI contains '%d:%s.' %
+ (encoder.get_encoder_type(), encoder.get_serialized_params()), and
+ this is enough information to construct a compatible decoder.
+ """
+
+ def get_share_size():
+ """Return the length of the shares that encode() will produce.
+ """
+
+ def encode(data):
+ """Encode a chunk of data. This may be called multiple times. Each
+ call is independent.
+
+ The data must be a string with a length that exactly matches the
+ data_size promised by set_params().
+
+ For each call, encode() will return a Deferred that fires with a list
+ of 'total_shares' tuples. Each tuple is of the form (sharenum,
+ share), where sharenum is an int (from 0 total_shares-1), and share
+ is a string. The get_share_size() method can be used to determine the
+ length of the 'share' strings returned by encode().
+
+ The memory usage of this function is expected to be on the order of
+ total_shares * get_share_size().
+ """
+
+class IDecoder(Interface):
+ def set_serialized_params(params):
+ """Set up the parameters of this encoder, from a string returned by
+ encoder.get_serialized_params()."""
+
+ def decode(some_shares):
+ """Decode a partial list of shares into data.
+
+ 'some_shares' must be a list of (sharenum, share) tuples, a subset of
+ the shares returned by IEncoder.encode(). Each share must be of the
+ same length. The share tuples may appear in any order, but of course
+ each tuple must have a sharenum that correctly matches the associated
+ share data string.
+
+ This returns a Deferred which fires with a string. This string will
+ always have a length equal to the 'data_size' value passed into the
+ original IEncoder.set_params() call.
+
+ The length of 'some_shares' must be equal or greater than the value
+ of 'required_shares' passed into the original IEncoder.set_params()
+ call.
+ """
--- /dev/null
+
+import os
+from twisted.trial import unittest
+from twisted.internet import defer
+from allmydata.encode import PyRSEncoder, PyRSDecoder, ReplicatingEncoder, ReplicatingDecoder
+import random
+
+class Tester:
+ #enc_class = PyRSEncoder
+ #dec_class = PyRSDecoder
+
+ def do_test(self, size, required_shares, total_shares):
+ data0 = os.urandom(size)
+ enc = self.enc_class()
+ enc.set_params(size, required_shares, total_shares)
+ serialized_params = enc.get_serialized_params()
+ d = enc.encode(data0)
+ def _done(shares):
+ self.failUnlessEqual(len(shares), total_shares)
+ self.shares = shares
+ d.addCallback(_done)
+
+ def _decode(shares):
+ dec = self.dec_class()
+ dec.set_serialized_params(serialized_params)
+ d1 = dec.decode(shares)
+ return d1
+
+ def _check_data(data1):
+ self.failUnlessEqual(len(data1), len(data0))
+ self.failUnless(data1 == data0)
+
+ def _decode_all_ordered(res):
+ # can we decode using all of the shares?
+ return _decode(self.shares)
+ d.addCallback(_decode_all_ordered)
+ d.addCallback(_check_data)
+
+ def _decode_all_shuffled(res):
+ # can we decode, using all the shares, but in random order?
+ shuffled_shares = self.shares[:]
+ random.shuffle(shuffled_shares)
+ return _decode(shuffled_shares)
+ d.addCallback(_decode_all_shuffled)
+ d.addCallback(_check_data)
+
+ def _decode_some(res):
+ # decode with a minimal subset of the shares
+ some_shares = self.shares[:required_shares]
+ return _decode(some_shares)
+ d.addCallback(_decode_some)
+ d.addCallback(_check_data)
+
+ def _decode_some_random(res):
+ # use a randomly-selected minimal subset
+ some_shares = random.sample(self.shares, required_shares)
+ return _decode(some_shares)
+ d.addCallback(_decode_some_random)
+ d.addCallback(_check_data)
+
+ def _decode_multiple(res):
+ # make sure we can re-use the decoder object
+ shares1 = random.sample(self.shares, required_shares)
+ shares2 = random.sample(self.shares, required_shares)
+ dec = self.dec_class()
+ dec.set_serialized_params(serialized_params)
+ d1 = dec.decode(shares1)
+ d1.addCallback(_check_data)
+ d1.addCallback(lambda res: dec.decode(shares2))
+ d1.addCallback(_check_data)
+ return d1
+ d.addCallback(_decode_multiple)
+
+ return d
+
+ def test_encode(self):
+ return self.do_test(1000, 25, 100)
+
+ def test_sizes(self):
+ d = defer.succeed(None)
+ for i in range(1, 100):
+ d.addCallback(lambda res,size: self.do_test(size, 4, 10), i)
+ return d
+
+class PyRS(unittest.TestCase, Tester):
+ enc_class = PyRSEncoder
+ dec_class = PyRSDecoder
+
+
+class Replicating(unittest.TestCase, Tester):
+ enc_class = ReplicatingEncoder
+ dec_class = ReplicatingDecoder