-import struct
+import os, struct
from zope.interface import implements
from twisted.internet import defer
from allmydata.interfaces import IMutableFileNode, IMutableFileURI
-from allmydata.util import hashutil
+from allmydata.util import hashutil, mathutil
from allmydata.uri import WriteableSSKFileURI
+from allmydata.Crypto.Cipher import AES
+from allmydata import hashtree, codec
class MutableFileNode:
implements(IMutableFileNode)
self._privkey = None # filled in if we're mutable
self._sharemap = {} # known shares, shnum-to-nodeid
+ self._current_data = None # SDMF: we're allowed to cache the contents
+ self._current_roothash = None # ditto
+ self._current_seqnum = None # ditto
+
def init_from_uri(self, myuri):
+ # we have the URI, but we have not yet retrieved the public
+ # verification key, nor things like 'k' or 'N'. If and when someone
+ # wants to get our contents, we'll pull from shares and fill those
+ # in.
self._uri = IMutableFileURI(myuri)
return self
share_data = data[offsets['share_data']:offsets['share_data']+datalen]
enc_privkey = data[offsets['enc_privkey']:]
- def pack_data(self):
+
+# use client.create_mutable_file() to make one of these
+
+class Publish:
+ """I represent a single act of publishing the mutable file to the grid."""
+
+ def __init__(self, filenode):
+ self._node = filenode
+
+ def publish(self, newdata):
+ """Publish the filenode's current contents. Returns a Deferred that
+ fires (with None) when the publish has done as much work as it's ever
+ going to do, or errbacks with ConsistencyError if it detects a
+ simultaneous write."""
+
+ # 1: generate shares (SDMF: files are small, so we can do it in RAM)
+ # 2: perform peer selection, get candidate servers
+ # 3: pre-allocate some shares to some servers, based upon any existing
+ # self._node._sharemap
+ # 4: send allocate/testv_and_writev messages
+ # 5: as responses return, update share-dispatch table
+ # 5a: may need to run recovery algorithm
+ # 6: when enough responses are back, we're done
+
+ old_roothash = self._node._current_roothash
+ old_seqnum = self._node._current_seqnum
+
+ readkey = self._node.readkey
+ required_shares = self._node.required_shares
+ total_shares = self._node.total_shares
+ privkey = self._node.privkey
+ pubkey = self._node.pubkey
+
+ d = defer.succeed(newdata)
+ d.addCallback(self._encrypt_and_encode, readkey,
+ required_shares, total_shares)
+ d.addCallback(self._generate_shares, old_seqnum+1,
+ privkey, self._encprivkey, pubkey)
+
+ d.addCallback(self._get_peers)
+ d.addCallback(self._map_shares)
+ d.addCallback(self._send_shares)
+ d.addCallback(self._wait_for_responses)
+ d.addCallback(lambda res: None)
+ return d
+
+ def _encrypt_and_encode(self, newdata, readkey,
+ required_shares, total_shares):
+ IV = os.urandom(16)
+ key = hashutil.ssk_readkey_data_hash(IV, readkey)
+ enc = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
+ crypttext = enc.encrypt(newdata)
+
+ # now apply FEC
+ self.MAX_SEGMENT_SIZE = 1024*1024
+ segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext))
+ # this must be a multiple of self.required_shares
+ segment_size = mathutil.next_multiple(segment_size,
+ required_shares)
+ self.num_segments = mathutil.div_ceil(len(crypttext), segment_size)
+ assert self.num_segments == 1 # SDMF restrictions
+ fec = codec.CRSEncoder()
+ fec.set_params(segment_size, required_shares, total_shares)
+ piece_size = fec.get_block_size()
+ crypttext_pieces = []
+ for offset in range(0, len(crypttext), piece_size):
+ piece = crypttext[offset:offset+piece_size]
+ if len(piece) < piece_size:
+ pad_size = piece_size - len(piece)
+ piece = piece + "\x00"*pad_size
+ crypttext_pieces.append(piece)
+ assert len(piece) == piece_size
+
+ d = fec.encode(crypttext_pieces)
+ d.addCallback(lambda shares:
+ (shares, required_shares, total_shares,
+ segment_size, len(crypttext), IV) )
+ return d
+
+ def _generate_shares(self, (shares_and_shareids,
+ required_shares, total_shares,
+ segment_size, data_length, IV),
+ seqnum, privkey, encprivkey, pubkey):
+
+ (shares, share_ids) = shares_and_shareids
+
+ assert len(shares) == len(share_ids)
+ assert len(shares) == total_shares
+ all_shares = {}
+ block_hash_trees = {}
+ share_hash_leaves = [None] * len(shares)
+ for i in range(len(shares)):
+ share_data = shares[i]
+ shnum = share_ids[i]
+ all_shares[shnum] = share_data
+
+ # build the block hash tree. SDMF has only one leaf.
+ leaves = [hashutil.block_hash(share_data)]
+ t = hashtree.HashTree(leaves)
+ block_hash_trees[shnum] = block_hash_tree = list(t)
+ share_hash_leaves[shnum] = t[0]
+ for leaf in share_hash_leaves:
+ assert leaf is not None
+ share_hash_tree = hashtree.HashTree(share_hash_leaves)
+ share_hash_chain = {}
+ for shnum in range(total_shares):
+ needed_hashes = share_hash_tree.needed_hashes(shnum)
+ share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
+ for i in needed_hashes ] )
+ root_hash = share_hash_tree[0]
+ assert len(root_hash) == 32
+
+ prefix = self._pack_prefix(seqnum, root_hash,
+ required_shares, total_shares,
+ segment_size, data_length)
+
+ # now pack the beginning of the share. All shares are the same up
+ # to the signature, then they have divergent share hash chains,
+ # then completely different block hash trees + IV + share data,
+ # then they all share the same encprivkey at the end. The sizes
+ # of everything are the same for all shares.
+
+ signature = privkey.sign(prefix)
+
+ verification_key = pubkey.serialize()
+
+ final_shares = {}
+ for shnum in range(total_shares):
+ shc = share_hash_chain[shnum]
+ share_hash_chain_s = "".join([struct.pack(">H32s", i, shc[i])
+ for i in sorted(shc.keys())])
+ bht = block_hash_trees[shnum]
+ for h in bht:
+ assert len(h) == 32
+ block_hash_tree_s = "".join(bht)
+ share_data = all_shares[shnum]
+ offsets = self._pack_offsets(len(verification_key),
+ len(signature),
+ len(share_hash_chain),
+ len(block_hash_tree),
+ len(IV),
+ len(share_data))
+
+ final_shares[shnum] = "".join([prefix,
+ offsets,
+ verification_key,
+ signature,
+ share_hash_chain_s,
+ block_hash_tree_s,
+ IV,
+ share_data,
+ encprivkey])
+ return (seqnum, root_hash, final_shares)
+
+
+ def _pack_prefix(self, seqnum, root_hash,
+ required_shares, total_shares,
+ segment_size, data_length):
+ prefix = struct.pack(">BQ32s" + "BBQQ",
+ 0, # version,
+ seqnum,
+ root_hash,
+
+ required_shares,
+ total_shares,
+ segment_size,
+ data_length,
+ )
+ return prefix
+
+ def _pack_offsets(self, verification_key_length, signature_length,
+ share_hash_chain_length, block_hash_tree_length,
+ IV_length, share_data_length):
+ post_offset = struct.calcsize(">BQ32s" + "BBQQ" + "LLLLLQ")
+ offsets = {}
+ o1 = offsets['signature'] = post_offset + verification_key_length
+ o2 = offsets['share_hash_chain'] = o1 + signature_length
+ o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
+ assert IV_length == 16
+ o4 = offsets['IV'] = o3 + block_hash_tree_length
+ o5 = offsets['share_data'] = o4 + IV_length
+ o6 = offsets['enc_privkey'] = o5 + share_data_length
+
+ return struct.pack(">LLLLLQ",
+ offsets['signature'],
+ offsets['share_hash_chain'],
+ offsets['block_hash_tree'],
+ offsets['IV'],
+ offsets['share_data'],
+ offsets['enc_privkey'])
+
+ def OFF_pack_data(self):
# dummy values to satisfy pyflakes until we wire this all up
seqnum, root_hash, k, N, segsize, datalen = 0,0,0,0,0,0
(verification_key, signature, share_hash_chain, block_hash_tree,
enc_privkey])
return "".join(newbuf)
-
-# use client.create_mutable_file() to make one of these
d.addCallback(_created)
return d
+class Publish(unittest.TestCase):
+ def test_encrypt(self):
+ c = MyClient()
+ fn = FakeFilenode(c)
+ # .create usually returns a Deferred, but we happen to know it's
+ # synchronous
+ CONTENTS = "some initial contents"
+ fn.create(CONTENTS)
+ p = mutable.Publish(fn)
+ d = defer.maybeDeferred(p._encrypt_and_encode,
+ CONTENTS, "READKEY", 3, 10)
+ def _done( ((shares, share_ids),
+ required_shares, total_shares,
+ segsize, data_length, IV) ):
+ self.failUnlessEqual(len(shares), 10)
+ for sh in shares:
+ self.failUnless(isinstance(sh, str))
+ self.failUnlessEqual(len(sh), 7)
+ self.failUnlessEqual(len(share_ids), 10)
+ self.failUnlessEqual(required_shares, 3)
+ self.failUnlessEqual(total_shares, 10)
+ self.failUnlessEqual(segsize, 21)
+ self.failUnlessEqual(data_length, len(CONTENTS))
+ self.failUnlessEqual(len(IV), 16)
+ d.addCallback(_done)
+ return d
+
+ def test_generate(self):
+ c = MyClient()
+ fn = FakeFilenode(c)
+ # .create usually returns a Deferred, but we happen to know it's
+ # synchronous
+ CONTENTS = "some initial contents"
+ fn.create(CONTENTS)
+ p = mutable.Publish(fn)
+ # make some fake shares
+ shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
+ d = defer.maybeDeferred(p._generate_shares,
+ (shares_and_ids,
+ 3, 10,
+ 21, # segsize
+ len(CONTENTS),
+ "IV"*8),
+ 3, # seqnum
+ FakePrivKey(), "encprivkey", FakePubKey(),
+ )
+ def _done( (seqnum, root_hash, final_shares) ):
+ self.failUnlessEqual(seqnum, 3)
+ self.failUnlessEqual(len(root_hash), 32)
+ self.failUnless(isinstance(final_shares, dict))
+ self.failUnlessEqual(len(final_shares), 10)
+ self.failUnlessEqual(sorted(final_shares.keys()), range(10))
+ for i,sh in final_shares.items():
+ self.failUnless(isinstance(sh, str))
+ self.failUnlessEqual(len(sh), 359)
+ d.addCallback(_done)
+ return d
+
+class FakePubKey:
+ def serialize(self):
+ return "PUBKEY"
+class FakePrivKey:
+ def sign(self, data):
+ return "SIGN(%s)" % data
+
class Dirnode(unittest.TestCase):
def setUp(self):
self.client = MyClient()