From: Brian Warner Date: Sat, 3 Nov 2007 03:51:39 +0000 (-0700) Subject: mutable: implement filenode share-packing, still pretty rough X-Git-Tag: allmydata-tahoe-0.7.0~295 X-Git-Url: https://git.rkrishnan.org/Site?a=commitdiff_plain;h=78c45c82d14827f486a88af668a3d31e6b7ac364;p=tahoe-lafs%2Ftahoe-lafs.git mutable: implement filenode share-packing, still pretty rough --- diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index 1d877740..18159ab6 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -1,10 +1,12 @@ -import struct +import os, struct from zope.interface import implements from twisted.internet import defer from allmydata.interfaces import IMutableFileNode, IMutableFileURI -from allmydata.util import hashutil +from allmydata.util import hashutil, mathutil from allmydata.uri import WriteableSSKFileURI +from allmydata.Crypto.Cipher import AES +from allmydata import hashtree, codec class MutableFileNode: implements(IMutableFileNode) @@ -15,7 +17,15 @@ class MutableFileNode: self._privkey = None # filled in if we're mutable self._sharemap = {} # known shares, shnum-to-nodeid + self._current_data = None # SDMF: we're allowed to cache the contents + self._current_roothash = None # ditto + self._current_seqnum = None # ditto + def init_from_uri(self, myuri): + # we have the URI, but we have not yet retrieved the public + # verification key, nor things like 'k' or 'N'. If and when someone + # wants to get our contents, we'll pull from shares and fill those + # in. self._uri = IMutableFileURI(myuri) return self @@ -89,7 +99,198 @@ class MutableFileNode: share_data = data[offsets['share_data']:offsets['share_data']+datalen] enc_privkey = data[offsets['enc_privkey']:] - def pack_data(self): + +# use client.create_mutable_file() to make one of these + +class Publish: + """I represent a single act of publishing the mutable file to the grid.""" + + def __init__(self, filenode): + self._node = filenode + + def publish(self, newdata): + """Publish the filenode's current contents. Returns a Deferred that + fires (with None) when the publish has done as much work as it's ever + going to do, or errbacks with ConsistencyError if it detects a + simultaneous write.""" + + # 1: generate shares (SDMF: files are small, so we can do it in RAM) + # 2: perform peer selection, get candidate servers + # 3: pre-allocate some shares to some servers, based upon any existing + # self._node._sharemap + # 4: send allocate/testv_and_writev messages + # 5: as responses return, update share-dispatch table + # 5a: may need to run recovery algorithm + # 6: when enough responses are back, we're done + + old_roothash = self._node._current_roothash + old_seqnum = self._node._current_seqnum + + readkey = self._node.readkey + required_shares = self._node.required_shares + total_shares = self._node.total_shares + privkey = self._node.privkey + pubkey = self._node.pubkey + + d = defer.succeed(newdata) + d.addCallback(self._encrypt_and_encode, readkey, + required_shares, total_shares) + d.addCallback(self._generate_shares, old_seqnum+1, + privkey, self._encprivkey, pubkey) + + d.addCallback(self._get_peers) + d.addCallback(self._map_shares) + d.addCallback(self._send_shares) + d.addCallback(self._wait_for_responses) + d.addCallback(lambda res: None) + return d + + def _encrypt_and_encode(self, newdata, readkey, + required_shares, total_shares): + IV = os.urandom(16) + key = hashutil.ssk_readkey_data_hash(IV, readkey) + enc = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16) + crypttext = enc.encrypt(newdata) + + # now apply FEC + self.MAX_SEGMENT_SIZE = 1024*1024 + segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext)) + # this must be a multiple of self.required_shares + segment_size = mathutil.next_multiple(segment_size, + required_shares) + self.num_segments = mathutil.div_ceil(len(crypttext), segment_size) + assert self.num_segments == 1 # SDMF restrictions + fec = codec.CRSEncoder() + fec.set_params(segment_size, required_shares, total_shares) + piece_size = fec.get_block_size() + crypttext_pieces = [] + for offset in range(0, len(crypttext), piece_size): + piece = crypttext[offset:offset+piece_size] + if len(piece) < piece_size: + pad_size = piece_size - len(piece) + piece = piece + "\x00"*pad_size + crypttext_pieces.append(piece) + assert len(piece) == piece_size + + d = fec.encode(crypttext_pieces) + d.addCallback(lambda shares: + (shares, required_shares, total_shares, + segment_size, len(crypttext), IV) ) + return d + + def _generate_shares(self, (shares_and_shareids, + required_shares, total_shares, + segment_size, data_length, IV), + seqnum, privkey, encprivkey, pubkey): + + (shares, share_ids) = shares_and_shareids + + assert len(shares) == len(share_ids) + assert len(shares) == total_shares + all_shares = {} + block_hash_trees = {} + share_hash_leaves = [None] * len(shares) + for i in range(len(shares)): + share_data = shares[i] + shnum = share_ids[i] + all_shares[shnum] = share_data + + # build the block hash tree. SDMF has only one leaf. + leaves = [hashutil.block_hash(share_data)] + t = hashtree.HashTree(leaves) + block_hash_trees[shnum] = block_hash_tree = list(t) + share_hash_leaves[shnum] = t[0] + for leaf in share_hash_leaves: + assert leaf is not None + share_hash_tree = hashtree.HashTree(share_hash_leaves) + share_hash_chain = {} + for shnum in range(total_shares): + needed_hashes = share_hash_tree.needed_hashes(shnum) + share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i]) + for i in needed_hashes ] ) + root_hash = share_hash_tree[0] + assert len(root_hash) == 32 + + prefix = self._pack_prefix(seqnum, root_hash, + required_shares, total_shares, + segment_size, data_length) + + # now pack the beginning of the share. All shares are the same up + # to the signature, then they have divergent share hash chains, + # then completely different block hash trees + IV + share data, + # then they all share the same encprivkey at the end. The sizes + # of everything are the same for all shares. + + signature = privkey.sign(prefix) + + verification_key = pubkey.serialize() + + final_shares = {} + for shnum in range(total_shares): + shc = share_hash_chain[shnum] + share_hash_chain_s = "".join([struct.pack(">H32s", i, shc[i]) + for i in sorted(shc.keys())]) + bht = block_hash_trees[shnum] + for h in bht: + assert len(h) == 32 + block_hash_tree_s = "".join(bht) + share_data = all_shares[shnum] + offsets = self._pack_offsets(len(verification_key), + len(signature), + len(share_hash_chain), + len(block_hash_tree), + len(IV), + len(share_data)) + + final_shares[shnum] = "".join([prefix, + offsets, + verification_key, + signature, + share_hash_chain_s, + block_hash_tree_s, + IV, + share_data, + encprivkey]) + return (seqnum, root_hash, final_shares) + + + def _pack_prefix(self, seqnum, root_hash, + required_shares, total_shares, + segment_size, data_length): + prefix = struct.pack(">BQ32s" + "BBQQ", + 0, # version, + seqnum, + root_hash, + + required_shares, + total_shares, + segment_size, + data_length, + ) + return prefix + + def _pack_offsets(self, verification_key_length, signature_length, + share_hash_chain_length, block_hash_tree_length, + IV_length, share_data_length): + post_offset = struct.calcsize(">BQ32s" + "BBQQ" + "LLLLLQ") + offsets = {} + o1 = offsets['signature'] = post_offset + verification_key_length + o2 = offsets['share_hash_chain'] = o1 + signature_length + o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length + assert IV_length == 16 + o4 = offsets['IV'] = o3 + block_hash_tree_length + o5 = offsets['share_data'] = o4 + IV_length + o6 = offsets['enc_privkey'] = o5 + share_data_length + + return struct.pack(">LLLLLQ", + offsets['signature'], + offsets['share_hash_chain'], + offsets['block_hash_tree'], + offsets['IV'], + offsets['share_data'], + offsets['enc_privkey']) + + def OFF_pack_data(self): # dummy values to satisfy pyflakes until we wire this all up seqnum, root_hash, k, N, segsize, datalen = 0,0,0,0,0,0 (verification_key, signature, share_hash_chain, block_hash_tree, @@ -126,5 +327,3 @@ class MutableFileNode: enc_privkey]) return "".join(newbuf) - -# use client.create_mutable_file() to make one of these diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 186112af..bf121763 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -117,6 +117,71 @@ class Filenode(unittest.TestCase): d.addCallback(_created) return d +class Publish(unittest.TestCase): + def test_encrypt(self): + c = MyClient() + fn = FakeFilenode(c) + # .create usually returns a Deferred, but we happen to know it's + # synchronous + CONTENTS = "some initial contents" + fn.create(CONTENTS) + p = mutable.Publish(fn) + d = defer.maybeDeferred(p._encrypt_and_encode, + CONTENTS, "READKEY", 3, 10) + def _done( ((shares, share_ids), + required_shares, total_shares, + segsize, data_length, IV) ): + self.failUnlessEqual(len(shares), 10) + for sh in shares: + self.failUnless(isinstance(sh, str)) + self.failUnlessEqual(len(sh), 7) + self.failUnlessEqual(len(share_ids), 10) + self.failUnlessEqual(required_shares, 3) + self.failUnlessEqual(total_shares, 10) + self.failUnlessEqual(segsize, 21) + self.failUnlessEqual(data_length, len(CONTENTS)) + self.failUnlessEqual(len(IV), 16) + d.addCallback(_done) + return d + + def test_generate(self): + c = MyClient() + fn = FakeFilenode(c) + # .create usually returns a Deferred, but we happen to know it's + # synchronous + CONTENTS = "some initial contents" + fn.create(CONTENTS) + p = mutable.Publish(fn) + # make some fake shares + shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) ) + d = defer.maybeDeferred(p._generate_shares, + (shares_and_ids, + 3, 10, + 21, # segsize + len(CONTENTS), + "IV"*8), + 3, # seqnum + FakePrivKey(), "encprivkey", FakePubKey(), + ) + def _done( (seqnum, root_hash, final_shares) ): + self.failUnlessEqual(seqnum, 3) + self.failUnlessEqual(len(root_hash), 32) + self.failUnless(isinstance(final_shares, dict)) + self.failUnlessEqual(len(final_shares), 10) + self.failUnlessEqual(sorted(final_shares.keys()), range(10)) + for i,sh in final_shares.items(): + self.failUnless(isinstance(sh, str)) + self.failUnlessEqual(len(sh), 359) + d.addCallback(_done) + return d + +class FakePubKey: + def serialize(self): + return "PUBKEY" +class FakePrivKey: + def sign(self, data): + return "SIGN(%s)" % data + class Dirnode(unittest.TestCase): def setUp(self): self.client = MyClient() diff --git a/src/allmydata/util/hashutil.py b/src/allmydata/util/hashutil.py index 114a68fb..bf2747a1 100644 --- a/src/allmydata/util/hashutil.py +++ b/src/allmydata/util/hashutil.py @@ -125,5 +125,7 @@ def ssk_pubkey_fingerprint_hash(pubkey): def ssk_readkey_hash(writekey): return tagged_hash("allmydata_mutable_readkey_v1", writekey) +def ssk_readkey_data_hash(IV, readkey): + return tagged_pair_hash("allmydata_mutable_readkey_data_v1", IV, readkey) def ssk_storage_index_hash(readkey): return tagged_hash("allmydata_mutable_storage_index_v1", readkey)