]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable: implement filenode share-packing, still pretty rough
authorBrian Warner <warner@allmydata.com>
Sat, 3 Nov 2007 03:51:39 +0000 (20:51 -0700)
committerBrian Warner <warner@allmydata.com>
Sat, 3 Nov 2007 03:51:39 +0000 (20:51 -0700)
src/allmydata/mutable.py
src/allmydata/test/test_mutable.py
src/allmydata/util/hashutil.py

index 1d87774078f105e012d6c791eb6f9d11cae9da81..18159ab65899e1617288a16edd115f312da32d17 100644 (file)
@@ -1,10 +1,12 @@
 
-import struct
+import os, struct
 from zope.interface import implements
 from twisted.internet import defer
 from allmydata.interfaces import IMutableFileNode, IMutableFileURI
-from allmydata.util import hashutil
+from allmydata.util import hashutil, mathutil
 from allmydata.uri import WriteableSSKFileURI
+from allmydata.Crypto.Cipher import AES
+from allmydata import hashtree, codec
 
 class MutableFileNode:
     implements(IMutableFileNode)
@@ -15,7 +17,15 @@ class MutableFileNode:
         self._privkey = None # filled in if we're mutable
         self._sharemap = {} # known shares, shnum-to-nodeid
 
+        self._current_data = None # SDMF: we're allowed to cache the contents
+        self._current_roothash = None # ditto
+        self._current_seqnum = None # ditto
+
     def init_from_uri(self, myuri):
+        # we have the URI, but we have not yet retrieved the public
+        # verification key, nor things like 'k' or 'N'. If and when someone
+        # wants to get our contents, we'll pull from shares and fill those
+        # in.
         self._uri = IMutableFileURI(myuri)
         return self
 
@@ -89,7 +99,198 @@ class MutableFileNode:
         share_data = data[offsets['share_data']:offsets['share_data']+datalen]
         enc_privkey = data[offsets['enc_privkey']:]
 
-    def pack_data(self):
+
+# use client.create_mutable_file() to make one of these
+
+class Publish:
+    """I represent a single act of publishing the mutable file to the grid."""
+
+    def __init__(self, filenode):
+        self._node = filenode
+
+    def publish(self, newdata):
+        """Publish the filenode's current contents. Returns a Deferred that
+        fires (with None) when the publish has done as much work as it's ever
+        going to do, or errbacks with ConsistencyError if it detects a
+        simultaneous write."""
+
+        # 1: generate shares (SDMF: files are small, so we can do it in RAM)
+        # 2: perform peer selection, get candidate servers
+        # 3: pre-allocate some shares to some servers, based upon any existing
+        #    self._node._sharemap
+        # 4: send allocate/testv_and_writev messages
+        # 5: as responses return, update share-dispatch table
+        # 5a: may need to run recovery algorithm
+        # 6: when enough responses are back, we're done
+
+        old_roothash = self._node._current_roothash
+        old_seqnum = self._node._current_seqnum
+
+        readkey = self._node.readkey
+        required_shares = self._node.required_shares
+        total_shares = self._node.total_shares
+        privkey = self._node.privkey
+        pubkey = self._node.pubkey
+
+        d = defer.succeed(newdata)
+        d.addCallback(self._encrypt_and_encode, readkey,
+                      required_shares, total_shares)
+        d.addCallback(self._generate_shares, old_seqnum+1,
+                      privkey, self._encprivkey, pubkey)
+
+        d.addCallback(self._get_peers)
+        d.addCallback(self._map_shares)
+        d.addCallback(self._send_shares)
+        d.addCallback(self._wait_for_responses)
+        d.addCallback(lambda res: None)
+        return d
+
+    def _encrypt_and_encode(self, newdata, readkey,
+                            required_shares, total_shares):
+        IV = os.urandom(16)
+        key = hashutil.ssk_readkey_data_hash(IV, readkey)
+        enc = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
+        crypttext = enc.encrypt(newdata)
+
+        # now apply FEC
+        self.MAX_SEGMENT_SIZE = 1024*1024
+        segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext))
+        # this must be a multiple of self.required_shares
+        segment_size = mathutil.next_multiple(segment_size,
+                                                   required_shares)
+        self.num_segments = mathutil.div_ceil(len(crypttext), segment_size)
+        assert self.num_segments == 1 # SDMF restrictions
+        fec = codec.CRSEncoder()
+        fec.set_params(segment_size, required_shares, total_shares)
+        piece_size = fec.get_block_size()
+        crypttext_pieces = []
+        for offset in range(0, len(crypttext), piece_size):
+            piece = crypttext[offset:offset+piece_size]
+            if len(piece) < piece_size:
+                pad_size = piece_size - len(piece)
+                piece = piece + "\x00"*pad_size
+            crypttext_pieces.append(piece)
+            assert len(piece) == piece_size
+
+        d = fec.encode(crypttext_pieces)
+        d.addCallback(lambda shares:
+                      (shares, required_shares, total_shares,
+                       segment_size, len(crypttext), IV) )
+        return d
+
+    def _generate_shares(self, (shares_and_shareids,
+                                required_shares, total_shares,
+                                segment_size, data_length, IV),
+                         seqnum, privkey, encprivkey, pubkey):
+
+        (shares, share_ids) = shares_and_shareids
+
+        assert len(shares) == len(share_ids)
+        assert len(shares) == total_shares
+        all_shares = {}
+        block_hash_trees = {}
+        share_hash_leaves = [None] * len(shares)
+        for i in range(len(shares)):
+            share_data = shares[i]
+            shnum = share_ids[i]
+            all_shares[shnum] = share_data
+
+            # build the block hash tree. SDMF has only one leaf.
+            leaves = [hashutil.block_hash(share_data)]
+            t = hashtree.HashTree(leaves)
+            block_hash_trees[shnum] = block_hash_tree = list(t)
+            share_hash_leaves[shnum] = t[0]
+        for leaf in share_hash_leaves:
+            assert leaf is not None
+        share_hash_tree = hashtree.HashTree(share_hash_leaves)
+        share_hash_chain = {}
+        for shnum in range(total_shares):
+            needed_hashes = share_hash_tree.needed_hashes(shnum)
+            share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
+                                              for i in needed_hashes ] )
+        root_hash = share_hash_tree[0]
+        assert len(root_hash) == 32
+
+        prefix = self._pack_prefix(seqnum, root_hash,
+                                   required_shares, total_shares,
+                                   segment_size, data_length)
+
+        # now pack the beginning of the share. All shares are the same up
+        # to the signature, then they have divergent share hash chains,
+        # then completely different block hash trees + IV + share data,
+        # then they all share the same encprivkey at the end. The sizes
+        # of everything are the same for all shares.
+
+        signature = privkey.sign(prefix)
+
+        verification_key = pubkey.serialize()
+
+        final_shares = {}
+        for shnum in range(total_shares):
+            shc = share_hash_chain[shnum]
+            share_hash_chain_s = "".join([struct.pack(">H32s", i, shc[i])
+                                          for i in sorted(shc.keys())])
+            bht = block_hash_trees[shnum]
+            for h in bht:
+                assert len(h) == 32
+            block_hash_tree_s = "".join(bht)
+            share_data = all_shares[shnum]
+            offsets = self._pack_offsets(len(verification_key),
+                                         len(signature),
+                                         len(share_hash_chain),
+                                         len(block_hash_tree),
+                                         len(IV),
+                                         len(share_data))
+
+            final_shares[shnum] = "".join([prefix,
+                                           offsets,
+                                           verification_key,
+                                           signature,
+                                           share_hash_chain_s,
+                                           block_hash_tree_s,
+                                           IV,
+                                           share_data,
+                                           encprivkey])
+        return (seqnum, root_hash, final_shares)
+
+
+    def _pack_prefix(self, seqnum, root_hash,
+                     required_shares, total_shares,
+                     segment_size, data_length):
+        prefix = struct.pack(">BQ32s" + "BBQQ",
+                             0, # version,
+                             seqnum,
+                             root_hash,
+
+                             required_shares,
+                             total_shares,
+                             segment_size,
+                             data_length,
+                             )
+        return prefix
+
+    def _pack_offsets(self, verification_key_length, signature_length,
+                      share_hash_chain_length, block_hash_tree_length,
+                      IV_length, share_data_length):
+        post_offset = struct.calcsize(">BQ32s" + "BBQQ" + "LLLLLQ")
+        offsets = {}
+        o1 = offsets['signature'] = post_offset + verification_key_length
+        o2 = offsets['share_hash_chain'] = o1 + signature_length
+        o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
+        assert IV_length == 16
+        o4 = offsets['IV'] = o3 + block_hash_tree_length
+        o5 = offsets['share_data'] = o4 + IV_length
+        o6 = offsets['enc_privkey'] = o5 + share_data_length
+
+        return struct.pack(">LLLLLQ",
+                           offsets['signature'],
+                           offsets['share_hash_chain'],
+                           offsets['block_hash_tree'],
+                           offsets['IV'],
+                           offsets['share_data'],
+                           offsets['enc_privkey'])
+
+    def OFF_pack_data(self):
         # dummy values to satisfy pyflakes until we wire this all up
         seqnum, root_hash, k, N, segsize, datalen = 0,0,0,0,0,0
         (verification_key, signature, share_hash_chain, block_hash_tree,
@@ -126,5 +327,3 @@ class MutableFileNode:
                        enc_privkey])
         return "".join(newbuf)
 
-
-# use client.create_mutable_file() to make one of these
index 186112af3f64e90eba5a3e844bd012f13439fd59..bf121763a1dde3fae4d5eac476176f9a7f2724b5 100644 (file)
@@ -117,6 +117,71 @@ class Filenode(unittest.TestCase):
         d.addCallback(_created)
         return d
 
+class Publish(unittest.TestCase):
+    def test_encrypt(self):
+        c = MyClient()
+        fn = FakeFilenode(c)
+        # .create usually returns a Deferred, but we happen to know it's
+        # synchronous
+        CONTENTS = "some initial contents"
+        fn.create(CONTENTS)
+        p = mutable.Publish(fn)
+        d = defer.maybeDeferred(p._encrypt_and_encode,
+                                CONTENTS, "READKEY", 3, 10)
+        def _done( ((shares, share_ids),
+                    required_shares, total_shares,
+                    segsize, data_length, IV) ):
+            self.failUnlessEqual(len(shares), 10)
+            for sh in shares:
+                self.failUnless(isinstance(sh, str))
+                self.failUnlessEqual(len(sh), 7)
+            self.failUnlessEqual(len(share_ids), 10)
+            self.failUnlessEqual(required_shares, 3)
+            self.failUnlessEqual(total_shares, 10)
+            self.failUnlessEqual(segsize, 21)
+            self.failUnlessEqual(data_length, len(CONTENTS))
+            self.failUnlessEqual(len(IV), 16)
+        d.addCallback(_done)
+        return d
+
+    def test_generate(self):
+        c = MyClient()
+        fn = FakeFilenode(c)
+        # .create usually returns a Deferred, but we happen to know it's
+        # synchronous
+        CONTENTS = "some initial contents"
+        fn.create(CONTENTS)
+        p = mutable.Publish(fn)
+        # make some fake shares
+        shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
+        d = defer.maybeDeferred(p._generate_shares,
+                                (shares_and_ids,
+                                 3, 10,
+                                 21, # segsize
+                                 len(CONTENTS),
+                                 "IV"*8),
+                                3, # seqnum
+                                FakePrivKey(), "encprivkey", FakePubKey(),
+                                )
+        def _done( (seqnum, root_hash, final_shares) ):
+            self.failUnlessEqual(seqnum, 3)
+            self.failUnlessEqual(len(root_hash), 32)
+            self.failUnless(isinstance(final_shares, dict))
+            self.failUnlessEqual(len(final_shares), 10)
+            self.failUnlessEqual(sorted(final_shares.keys()), range(10))
+            for i,sh in final_shares.items():
+                self.failUnless(isinstance(sh, str))
+                self.failUnlessEqual(len(sh), 359)
+        d.addCallback(_done)
+        return d
+
+class FakePubKey:
+    def serialize(self):
+        return "PUBKEY"
+class FakePrivKey:
+    def sign(self, data):
+        return "SIGN(%s)" % data
+
 class Dirnode(unittest.TestCase):
     def setUp(self):
         self.client = MyClient()
index 114a68fb6c622536c729eda2d60dafa482cc94a0..bf2747a1f6e804424c41ce9578dcbc655b087119 100644 (file)
@@ -125,5 +125,7 @@ def ssk_pubkey_fingerprint_hash(pubkey):
 
 def ssk_readkey_hash(writekey):
     return tagged_hash("allmydata_mutable_readkey_v1", writekey)
+def ssk_readkey_data_hash(IV, readkey):
+    return tagged_pair_hash("allmydata_mutable_readkey_data_v1", IV, readkey)
 def ssk_storage_index_hash(readkey):
     return tagged_hash("allmydata_mutable_storage_index_v1", readkey)