From: Brian Warner Date: Tue, 6 Nov 2007 05:14:59 +0000 (-0700) Subject: mutable.Publish: create a dispatch_map for the benefit of recovery code, and pull... X-Git-Tag: allmydata-tahoe-0.7.0~282 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/frontends/wapi.txt?a=commitdiff_plain;h=281afe7cfc021d4d810f222be063fcb2fba62a26;p=tahoe-lafs%2Ftahoe-lafs.git mutable.Publish: create a dispatch_map for the benefit of recovery code, and pull pack/unpack methods out into functions --- diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index 4c62dc03..c9c087aa 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -10,8 +10,6 @@ from allmydata import hashtree, codec from allmydata.encode import NotEnoughPeersError -HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ") - class NeedMoreDataError(Exception): def __init__(self, needed_bytes): Exception.__init__(self) @@ -20,6 +18,105 @@ class NeedMoreDataError(Exception): class UncoordinatedWriteError(Exception): pass +HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ") + +def unpack_share(data): + assert len(data) >= HEADER_LENGTH + o = {} + (version, + seqnum, + root_hash, + k, N, segsize, datalen, + o['signature'], + o['share_hash_chain'], + o['block_hash_tree'], + o['IV'], + o['share_data'], + o['enc_privkey'], + o['EOF']) = struct.unpack(">BQ32s" + "BBQQ" + "LLLLLQQ", + data[:HEADER_LENGTH]) + + assert version == 0 + if len(data) < o['EOF']: + raise NeedMoreDataError(o['EOF']) + + pubkey = data[HEADER_LENGTH:o['signature']] + signature = data[o['signature']:o['share_hash_chain']] + share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']] + share_hash_format = ">H32s" + hsize = struct.calcsize(share_hash_format) + assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s) + share_hash_chain = [] + for i in range(0, len(share_hash_chain_s), hsize): + chunk = share_hash_chain_s[i:i+hsize] + (hid, h) = struct.unpack(share_hash_format, chunk) + share_hash_chain.append( (hid, h) ) + block_hash_tree_s = data[o['block_hash_tree']:o['IV']] + assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s) + block_hash_tree = [] + for i in range(0, len(block_hash_tree_s), 32): + block_hash_tree.append(block_hash_tree_s[i:i+32]) + + IV = data[o['IV']:o['share_data']] + share_data = data[o['share_data']:o['enc_privkey']] + enc_privkey = data[o['enc_privkey']:o['EOF']] + + return (seqnum, root_hash, k, N, segsize, datalen, + pubkey, signature, share_hash_chain, block_hash_tree, + IV, share_data, enc_privkey) + + +def pack_checkstring(seqnum, root_hash): + return struct.pack(">BQ32s", + 0, # version, + seqnum, + root_hash) + +def unpack_checkstring(checkstring): + cs_len = struct.calcsize(">BQ32s") + version, seqnum, root_hash = struct.unpack(">BQ32s", + checkstring[:cs_len]) + assert version == 0 # TODO: just ignore the share + return (seqnum, root_hash) + +def pack_prefix(seqnum, root_hash, + required_shares, total_shares, + segment_size, data_length): + prefix = struct.pack(">BQ32s" + "BBQQ", + 0, # version, + seqnum, + root_hash, + + required_shares, + total_shares, + segment_size, + data_length, + ) + return prefix + +def pack_offsets(verification_key_length, signature_length, + share_hash_chain_length, block_hash_tree_length, + IV_length, share_data_length, encprivkey_length): + post_offset = HEADER_LENGTH + offsets = {} + o1 = offsets['signature'] = post_offset + verification_key_length + o2 = offsets['share_hash_chain'] = o1 + signature_length + o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length + assert IV_length == 16 + o4 = offsets['IV'] = o3 + block_hash_tree_length + o5 = offsets['share_data'] = o4 + IV_length + o6 = offsets['enc_privkey'] = o5 + share_data_length + o7 = offsets['EOF'] = o6 + encprivkey_length + + return struct.pack(">LLLLLQQ", + offsets['signature'], + offsets['share_hash_chain'], + offsets['block_hash_tree'], + offsets['IV'], + offsets['share_data'], + offsets['enc_privkey'], + offsets['EOF']) + # use client.create_mutable_file() to make one of these class MutableFileNode: @@ -109,54 +206,8 @@ class MutableFileNode: def replace(self, newdata): return defer.succeed(None) -class ShareFormattingMixin: - - def _unpack_share(self, data): - assert len(data) >= HEADER_LENGTH - o = {} - (version, - seqnum, - root_hash, - k, N, segsize, datalen, - o['signature'], - o['share_hash_chain'], - o['block_hash_tree'], - o['IV'], - o['share_data'], - o['enc_privkey'], - o['EOF']) = struct.unpack(">BQ32s" + "BBQQ" + "LLLLLQQ", - data[:HEADER_LENGTH]) - - assert version == 0 - if len(data) < o['EOF']: - raise NeedMoreDataError(o['EOF']) - - pubkey = data[HEADER_LENGTH:o['signature']] - signature = data[o['signature']:o['share_hash_chain']] - share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']] - share_hash_format = ">H32s" - hsize = struct.calcsize(share_hash_format) - assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s) - share_hash_chain = [] - for i in range(0, len(share_hash_chain_s), hsize): - chunk = share_hash_chain_s[i:i+hsize] - (hid, h) = struct.unpack(share_hash_format, chunk) - share_hash_chain.append( (hid, h) ) - block_hash_tree_s = data[o['block_hash_tree']:o['IV']] - assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s) - block_hash_tree = [] - for i in range(0, len(block_hash_tree_s), 32): - block_hash_tree.append(block_hash_tree_s[i:i+32]) - - IV = data[o['IV']:o['share_data']] - share_data = data[o['share_data']:o['enc_privkey']] - enc_privkey = data[o['enc_privkey']:o['EOF']] - - return (seqnum, root_hash, k, N, segsize, datalen, - pubkey, signature, share_hash_chain, block_hash_tree, - IV, share_data, enc_privkey) - -class Retrieve(ShareFormattingMixin): + +class Retrieve: def __init__(self, filenode): self._node = filenode @@ -167,8 +218,7 @@ class DictOfSets(dict): else: self[key] = set([value]) - -class Publish(ShareFormattingMixin): +class Publish: """I represent a single act of publishing the mutable file to the grid.""" def __init__(self, filenode): @@ -184,13 +234,10 @@ class Publish(ShareFormattingMixin): # 2: perform peer selection, get candidate servers # 2a: send queries to n+epsilon servers, to determine current shares # 2b: based upon responses, create target map - - # 3: pre-allocate some shares to some servers, based upon any existing - # self._node._sharemap - # 4: send allocate/testv_and_writev messages - # 5: as responses return, update share-dispatch table - # 5a: may need to run recovery algorithm - # 6: when enough responses are back, we're done + # 3: send slot_testv_and_readv_and_writev messages + # 4: as responses return, update share-dispatch table + # 4a: may need to run recovery algorithm + # 5: when enough responses are back, we're done old_roothash = self._node._current_roothash old_seqnum = self._node._current_seqnum @@ -209,7 +256,7 @@ class Publish(ShareFormattingMixin): d.addCallback(self._query_peers, total_shares) d.addCallback(self._send_shares) - d.addCallback(self._wait_for_responses) + d.addCallback(self._maybe_recover) d.addCallback(lambda res: None) return d @@ -279,9 +326,9 @@ class Publish(ShareFormattingMixin): root_hash = share_hash_tree[0] assert len(root_hash) == 32 - prefix = self._pack_prefix(seqnum, root_hash, - required_shares, total_shares, - segment_size, data_length) + prefix = pack_prefix(seqnum, root_hash, + required_shares, total_shares, + segment_size, data_length) # now pack the beginning of the share. All shares are the same up # to the signature, then they have divergent share hash chains, @@ -303,13 +350,13 @@ class Publish(ShareFormattingMixin): assert len(h) == 32 block_hash_tree_s = "".join(bht) share_data = all_shares[shnum] - offsets = self._pack_offsets(len(verification_key), - len(signature), - len(share_hash_chain_s), - len(block_hash_tree_s), - len(IV), - len(share_data), - len(encprivkey)) + offsets = pack_offsets(len(verification_key), + len(signature), + len(share_hash_chain_s), + len(block_hash_tree_s), + len(IV), + len(share_data), + len(encprivkey)) final_shares[shnum] = "".join([prefix, offsets, @@ -323,50 +370,6 @@ class Publish(ShareFormattingMixin): return (seqnum, root_hash, final_shares) - def _pack_checkstring(self, seqnum, root_hash): - return struct.pack(">BQ32s", - 0, # version, - seqnum, - root_hash) - - def _pack_prefix(self, seqnum, root_hash, - required_shares, total_shares, - segment_size, data_length): - prefix = struct.pack(">BQ32s" + "BBQQ", - 0, # version, - seqnum, - root_hash, - - required_shares, - total_shares, - segment_size, - data_length, - ) - return prefix - - def _pack_offsets(self, verification_key_length, signature_length, - share_hash_chain_length, block_hash_tree_length, - IV_length, share_data_length, encprivkey_length): - post_offset = HEADER_LENGTH - offsets = {} - o1 = offsets['signature'] = post_offset + verification_key_length - o2 = offsets['share_hash_chain'] = o1 + signature_length - o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length - assert IV_length == 16 - o4 = offsets['IV'] = o3 + block_hash_tree_length - o5 = offsets['share_data'] = o4 + IV_length - o6 = offsets['enc_privkey'] = o5 + share_data_length - o7 = offsets['EOF'] = o6 + encprivkey_length - - return struct.pack(">LLLLLQQ", - offsets['signature'], - offsets['share_hash_chain'], - offsets['block_hash_tree'], - offsets['IV'], - offsets['share_data'], - offsets['enc_privkey'], - offsets['EOF']) - def _query_peers(self, (seqnum, root_hash, final_shares), total_shares): self._new_seqnum = seqnum self._new_root_hash = root_hash @@ -416,7 +419,7 @@ class Publish(ShareFormattingMixin): for shnum, datav in datavs.items(): assert len(datav) == 1 data = datav[0] - r = self._unpack_share(data) + r = unpack_share(data) share = (shnum, r[0], r[1]) # shnum,seqnum,R current_share_peers[shnum].add( (peerid, r[0], r[1]) ) @@ -476,8 +479,8 @@ class Publish(ShareFormattingMixin): # surprises here are *not* indications of UncoordinatedWriteError, # and we'll need to respond to them more gracefully. - my_checkstring = self._pack_checkstring(self._new_seqnum, - self._new_root_hash) + my_checkstring = pack_checkstring(self._new_seqnum, + self._new_root_hash) peer_messages = {} expected_old_shares = {} @@ -498,6 +501,7 @@ class Publish(ShareFormattingMixin): dl = [] # ok, send the messages! self._surprised = False + dispatch_map = DictOfSets() for peerid, tw_vectors in peer_messages.items(): @@ -508,12 +512,12 @@ class Publish(ShareFormattingMixin): d = self._do_testreadwrite(peerid, peer_storage_servers, secrets, tw_vectors, read_vector) - d.addCallback(self._got_write_answer, - peerid, expected_old_shares[peerid]) + d.addCallback(self._got_write_answer, tw_vectors, my_checkstring, + peerid, expected_old_shares[peerid], dispatch_map) dl.append(d) d = defer.DeferredList(dl) - d.addCallback(lambda res: self._surprised) + d.addCallback(lambda res: (self._surprised, dispatch_map)) return d def _do_testreadwrite(self, peerid, peer_storage_servers, secrets, @@ -528,24 +532,42 @@ class Publish(ShareFormattingMixin): read_vector) return d - def _got_write_answer(self, answer, peerid, expected_old_shares): + def _got_write_answer(self, answer, tw_vectors, my_checkstring, + peerid, expected_old_shares, + dispatch_map): wrote, read_data = answer surprised = False + if not wrote: # surprise! our testv failed, so the write did not happen surprised = True - for shnum, (old_checkstring,) in read_data.items(): + + for shnum, (old_cs,) in read_data.items(): + old_seqnum, old_root_hash = unpack_checkstring(old_cs) + if wrote and shnum in tw_vectors: + current_cs = my_checkstring + else: + current_cs = old_cs + + current_seqnum, current_root_hash = unpack_checkstring(current_cs) + dispatch_map.add(shnum, (peerid, current_seqnum, current_root_hash)) + if shnum not in expected_old_shares: # surprise! there was a share we didn't know about surprised = True else: seqnum, root_hash = expected_old_shares[shnum] if seqnum is not None: - expected_checkstring = self._pack_checkstring(seqnum, - root_hash) - if old_checkstring != expected_checkstring: - # surprise! somebody modified the share + if seqnum != old_seqnum or root_hash != old_root_hash: + # surprise! somebody modified the share on us surprised = True if surprised: self._surprised = True + def _maybe_recover(self, (surprised, dispatch_map)): + if not surprised: + return + print "RECOVERY NOT YET IMPLEMENTED" + # but dispatch_map will help us do it + raise UncoordinatedWriteError("I was surprised!") + diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 56f4285e..373e4d30 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -213,7 +213,7 @@ class Publish(unittest.TestCase): self.failUnless(isinstance(sh, str)) self.failUnlessEqual(len(sh), 367) # feed the share through the unpacker as a sanity-check - pieces = r._unpack_share(sh) + pieces = mutable.unpack_share(sh) (u_seqnum, u_root_hash, k, N, segsize, datalen, pubkey, signature, share_hash_chain, block_hash_tree, IV, share_data, enc_privkey) = pieces @@ -355,7 +355,7 @@ class Publish(unittest.TestCase): d, p = self.setup_for_write(20, total_shares) d.addCallback(p._query_peers, total_shares) d.addCallback(p._send_shares) - def _done(surprised): + def _done((surprised, dispatch_map)): self.failIf(surprised, "surprised!") d.addCallback(_done) return d