establish IEncoder/IDecoder, create suitable interfaces for both the simple replicati...
authorBrian Warner <warner@allmydata.com>
Fri, 5 Jan 2007 04:52:51 +0000 (21:52 -0700)
committerBrian Warner <warner@allmydata.com>
Fri, 5 Jan 2007 04:52:51 +0000 (21:52 -0700)
src/allmydata/encode.py
src/allmydata/interfaces.py
src/allmydata/test/test_encode_share.py [new file with mode: 0644]

index ea54fbd2bb9d79384a56e99c5d4cce9e6aabc322..28775ebc50c58b9882011a36a029bde7c1fe8492 100644 (file)
@@ -1,10 +1,49 @@
+# -*- test-case-name: allmydata.test.test_encode_share -*-
+
+from zope.interface import implements
 from twisted.internet import defer
 import sha
-from allmydata.util import idlib
+from allmydata.util import idlib, mathutil
+from allmydata.interfaces import IEncoder, IDecoder
+from allmydata.py_ecc import rs_code
 
 def netstring(s):
     return "%d:%s," % (len(s), s)
 
+class ReplicatingEncoder(object):
+    implements(IEncoder)
+    ENCODER_TYPE = 0
+
+    def set_params(self, data_size, required_shares, total_shares):
+        self.data_size = data_size
+        self.required_shares = required_shares
+        self.total_shares = total_shares
+
+    def get_encoder_type(self):
+        return self.ENCODER_TYPE
+
+    def get_serialized_params(self):
+        return "%d" % self.required_shares
+
+    def get_share_size(self):
+        return self.data_size
+
+    def encode(self, data):
+        shares = [(i,data) for i in range(self.total_shares)]
+        return defer.succeed(shares)
+
+class ReplicatingDecoder(object):
+    implements(IDecoder)
+
+    def set_serialized_params(self, params):
+        self.required_shares = int(params)
+
+    def decode(self, some_shares):
+        assert len(some_shares) >= self.required_shares
+        data = some_shares[0][1]
+        return defer.succeed(data)
+
+
 class Encoder(object):
     def __init__(self, infile, m):
         self.infile = infile
@@ -48,3 +87,99 @@ class Decoder(object):
         if self._verifierid:
             assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid))
 
+
+class PyRSEncoder(object):
+    ENCODER_TYPE = 1
+
+    # we will break the data into vectors in which each element is a single
+    # byte (i.e. a single number from 0 to 255), and the length of the vector
+    # is equal to the number of required_shares. We use padding to make the
+    # last chunk of data long enough to match, and we record the data_size in
+    # the serialized parameters to strip this padding out on the receiving
+    # end.
+
+    def set_params(self, data_size, required_shares, total_shares):
+        self.data_size = data_size
+        self.required_shares = required_shares
+        self.total_shares = total_shares
+        self.chunk_size = required_shares
+        self.num_chunks = mathutil.div_ceil(data_size, self.chunk_size)
+        self.last_chunk_padding = mathutil.pad_size(data_size, required_shares)
+        self.share_size = self.num_chunks
+        self.encoder = rs_code.RSCode(total_shares, required_shares)
+
+    def get_encoder_type(self):
+        return self.ENCODER_TYPE
+
+    def get_serialized_params(self):
+        return "%d:%d:%d" % (self.data_size, self.required_shares,
+                             self.total_shares)
+
+    def get_share_size(self):
+        return self.share_size
+
+    def encode(self, data):
+        share_data = [ [] for i in range(self.total_shares)]
+        for i in range(self.num_chunks):
+            offset = i*self.chunk_size
+            chunk = data[offset:offset+self.chunk_size]
+            if i == self.num_chunks-1:
+                chunk = chunk + "\x00"*self.last_chunk_padding
+            assert len(chunk) == self.chunk_size
+            input_vector = [ord(x) for x in chunk]
+            output_vector = self.encoder.Encode(input_vector)
+            assert len(output_vector) == self.total_shares
+            for i2,out in enumerate(output_vector):
+                out_chars = [chr(x) for x in out]
+                out_string = "".join(out_chars)
+                share_data[i2].append(out_string)
+
+        shares = [ (i, "".join(share_data[i]))
+                   for i in range(self.total_shares) ]
+        return defer.succeed(shares)
+
+class PyRSDecoder(object):
+
+    def set_serialized_params(self, params):
+        pieces = params.split(":")
+        self.data_size = int(pieces[0])
+        self.required_shares = int(pieces[1])
+        self.total_shares = int(pieces[2])
+
+        self.chunk_size = self.required_shares
+        self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size)
+        self.last_chunk_padding = mathutil.pad_size(self.data_size,
+                                                    self.required_shares)
+        self.share_size = self.num_chunks
+        self.encoder = rs_code.RSCode(self.total_shares, self.required_shares)
+
+    def decode(self, some_shares):
+        chunk_size = self.chunk_size
+        assert len(some_shares) >= self.required_shares
+        chunks = [ [] for i in range(self.num_chunks) ]
+        have_shares = {}
+        for share_num, share_data in some_shares:
+            have_shares[share_num] = share_data
+        for i in range(self.num_chunks):
+            offset = i*chunk_size
+            received_vector = []
+            for j in range(self.total_shares):
+                share = have_shares.get(j)
+                if share is not None:
+                    v1 = [ord(x) for x in share[offset:offset+chunk_size]]
+                    received_vector.append(v1)
+                else:
+                    received_vector.append(None)
+            decoded_vector = self.encoder.DecodeImmediate(received_vector)
+            if i == self.num_chunks-1:
+                decoded_vector = decoded_vector[:-self.last_chunk_padding]
+            chunk = "".join([chr(x) for x in decoded_vector])
+            chunks.append(chunk)
+        data = "".join(chunks)
+        return defer.succeed(data)
+
+
+all_encoders = {
+    ReplicatingEncoder.ENCODER_TYPE: (ReplicatingEncoder, ReplicatingDecoder),
+    PyRSEncoder.ENCODER_TYPE: (PyRSEncoder, PyRSDecoder),
+    }
index 14d081e398542d1d314a9466f69cc71a862b39cf..2bea869f31e578654ea665add485d2250d3df41c 100644 (file)
@@ -1,4 +1,5 @@
 
+from zope.interface import Interface
 from foolscap.schema import StringConstraint, ListOf, TupleOf, Any, Nothing
 from foolscap import RemoteInterface
 
@@ -69,7 +70,78 @@ class RIMutableDirectoryNode(RemoteInterface):
 
     # need more to move directories
 
-# TODO: figleaf gets confused when the last line of a file is a comment. I
-# suspect an off-by-one error in the code that decides which lines are code
-# and which are not.
-pass
+
+class IEncoder(Interface):
+    def set_params(data_size, required_shares, total_shares):
+        """Set up the parameters of this encoder.
+
+        See encode() for a description of how these parameters are used.
+        """
+
+    def get_encoder_type():
+        """Return an integer that describes the type of this encoder.
+
+        There must be a global table of encoder classes. This method returns
+        an index into this table; the value at this index is an encoder
+        class, and this encoder is an instance of that class.
+        """
+
+    def get_serialized_params(): # TODO: maybe, maybe not
+        """Return a string that describes the parameters of this encoder.
+
+        This string can be passed to the decoder to prepare it for handling
+        the encoded shares we create. It might contain more information than
+        was presented to set_params(), if there is some flexibility of
+        parameter choice.
+
+        This string is intended to be embedded in the URI, so there are
+        several restrictions on its contents. At the moment I'm thinking that
+        this means it may contain hex digits and colons, and nothing else.
+        The idea is that the URI contains '%d:%s.' %
+        (encoder.get_encoder_type(), encoder.get_serialized_params()), and
+        this is enough information to construct a compatible decoder.
+        """
+
+    def get_share_size():
+        """Return the length of the shares that encode() will produce.
+        """
+
+    def encode(data):
+        """Encode a chunk of data. This may be called multiple times. Each
+        call is independent.
+
+        The data must be a string with a length that exactly matches the
+        data_size promised by set_params().
+
+        For each call, encode() will return a Deferred that fires with a list
+        of 'total_shares' tuples. Each tuple is of the form (sharenum,
+        share), where sharenum is an int (from 0 total_shares-1), and share
+        is a string. The get_share_size() method can be used to determine the
+        length of the 'share' strings returned by encode().
+
+        The memory usage of this function is expected to be on the order of
+        total_shares * get_share_size().
+        """
+
+class IDecoder(Interface):
+    def set_serialized_params(params):
+        """Set up the parameters of this encoder, from a string returned by
+        encoder.get_serialized_params()."""
+
+    def decode(some_shares):
+        """Decode a partial list of shares into data.
+
+        'some_shares' must be a list of (sharenum, share) tuples, a subset of
+        the shares returned by IEncoder.encode(). Each share must be of the
+        same length. The share tuples may appear in any order, but of course
+        each tuple must have a sharenum that correctly matches the associated
+        share data string.
+
+        This returns a Deferred which fires with a string. This string will
+        always have a length equal to the 'data_size' value passed into the
+        original IEncoder.set_params() call.
+
+        The length of 'some_shares' must be equal or greater than the value
+        of 'required_shares' passed into the original IEncoder.set_params()
+        call.
+        """
diff --git a/src/allmydata/test/test_encode_share.py b/src/allmydata/test/test_encode_share.py
new file mode 100644 (file)
index 0000000..a8516f6
--- /dev/null
@@ -0,0 +1,92 @@
+
+import os
+from twisted.trial import unittest
+from twisted.internet import defer
+from allmydata.encode import PyRSEncoder, PyRSDecoder, ReplicatingEncoder, ReplicatingDecoder
+import random
+
+class Tester:
+    #enc_class = PyRSEncoder
+    #dec_class = PyRSDecoder
+
+    def do_test(self, size, required_shares, total_shares):
+        data0 = os.urandom(size)
+        enc = self.enc_class()
+        enc.set_params(size, required_shares, total_shares)
+        serialized_params = enc.get_serialized_params()
+        d = enc.encode(data0)
+        def _done(shares):
+            self.failUnlessEqual(len(shares), total_shares)
+            self.shares = shares
+        d.addCallback(_done)
+
+        def _decode(shares):
+            dec = self.dec_class()
+            dec.set_serialized_params(serialized_params)
+            d1 = dec.decode(shares)
+            return d1
+
+        def _check_data(data1):
+            self.failUnlessEqual(len(data1), len(data0))
+            self.failUnless(data1 == data0)
+
+        def _decode_all_ordered(res):
+            # can we decode using all of the shares?
+            return _decode(self.shares)
+        d.addCallback(_decode_all_ordered)
+        d.addCallback(_check_data)
+
+        def _decode_all_shuffled(res):
+            # can we decode, using all the shares, but in random order?
+            shuffled_shares = self.shares[:]
+            random.shuffle(shuffled_shares)
+            return _decode(shuffled_shares)
+        d.addCallback(_decode_all_shuffled)
+        d.addCallback(_check_data)
+        
+        def _decode_some(res):
+            # decode with a minimal subset of the shares
+            some_shares = self.shares[:required_shares]
+            return _decode(some_shares)
+        d.addCallback(_decode_some)
+        d.addCallback(_check_data)
+
+        def _decode_some_random(res):
+            # use a randomly-selected minimal subset
+            some_shares = random.sample(self.shares, required_shares)
+            return _decode(some_shares)
+        d.addCallback(_decode_some_random)
+        d.addCallback(_check_data)
+
+        def _decode_multiple(res):
+            # make sure we can re-use the decoder object
+            shares1 = random.sample(self.shares, required_shares)
+            shares2 = random.sample(self.shares, required_shares)
+            dec = self.dec_class()
+            dec.set_serialized_params(serialized_params)
+            d1 = dec.decode(shares1)
+            d1.addCallback(_check_data)
+            d1.addCallback(lambda res: dec.decode(shares2))
+            d1.addCallback(_check_data)
+            return d1
+        d.addCallback(_decode_multiple)
+
+        return d
+
+    def test_encode(self):
+        return self.do_test(1000, 25, 100)
+
+    def test_sizes(self):
+        d = defer.succeed(None)
+        for i in range(1, 100):
+            d.addCallback(lambda res,size: self.do_test(size, 4, 10), i)
+        return d
+
+class PyRS(unittest.TestCase, Tester):
+    enc_class = PyRSEncoder
+    dec_class = PyRSDecoder
+
+
+class Replicating(unittest.TestCase, Tester):
+    enc_class = ReplicatingEncoder
+    dec_class = ReplicatingDecoder