from allmydata.encode import NotEnoughPeersError
-HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ")
-
class NeedMoreDataError(Exception):
def __init__(self, needed_bytes):
Exception.__init__(self)
class UncoordinatedWriteError(Exception):
pass
+HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ")
+
+def unpack_share(data):
+ assert len(data) >= HEADER_LENGTH
+ o = {}
+ (version,
+ seqnum,
+ root_hash,
+ k, N, segsize, datalen,
+ o['signature'],
+ o['share_hash_chain'],
+ o['block_hash_tree'],
+ o['IV'],
+ o['share_data'],
+ o['enc_privkey'],
+ o['EOF']) = struct.unpack(">BQ32s" + "BBQQ" + "LLLLLQQ",
+ data[:HEADER_LENGTH])
+
+ assert version == 0
+ if len(data) < o['EOF']:
+ raise NeedMoreDataError(o['EOF'])
+
+ pubkey = data[HEADER_LENGTH:o['signature']]
+ signature = data[o['signature']:o['share_hash_chain']]
+ share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']]
+ share_hash_format = ">H32s"
+ hsize = struct.calcsize(share_hash_format)
+ assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s)
+ share_hash_chain = []
+ for i in range(0, len(share_hash_chain_s), hsize):
+ chunk = share_hash_chain_s[i:i+hsize]
+ (hid, h) = struct.unpack(share_hash_format, chunk)
+ share_hash_chain.append( (hid, h) )
+ block_hash_tree_s = data[o['block_hash_tree']:o['IV']]
+ assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s)
+ block_hash_tree = []
+ for i in range(0, len(block_hash_tree_s), 32):
+ block_hash_tree.append(block_hash_tree_s[i:i+32])
+
+ IV = data[o['IV']:o['share_data']]
+ share_data = data[o['share_data']:o['enc_privkey']]
+ enc_privkey = data[o['enc_privkey']:o['EOF']]
+
+ return (seqnum, root_hash, k, N, segsize, datalen,
+ pubkey, signature, share_hash_chain, block_hash_tree,
+ IV, share_data, enc_privkey)
+
+
+def pack_checkstring(seqnum, root_hash):
+ return struct.pack(">BQ32s",
+ 0, # version,
+ seqnum,
+ root_hash)
+
+def unpack_checkstring(checkstring):
+ cs_len = struct.calcsize(">BQ32s")
+ version, seqnum, root_hash = struct.unpack(">BQ32s",
+ checkstring[:cs_len])
+ assert version == 0 # TODO: just ignore the share
+ return (seqnum, root_hash)
+
+def pack_prefix(seqnum, root_hash,
+ required_shares, total_shares,
+ segment_size, data_length):
+ prefix = struct.pack(">BQ32s" + "BBQQ",
+ 0, # version,
+ seqnum,
+ root_hash,
+
+ required_shares,
+ total_shares,
+ segment_size,
+ data_length,
+ )
+ return prefix
+
+def pack_offsets(verification_key_length, signature_length,
+ share_hash_chain_length, block_hash_tree_length,
+ IV_length, share_data_length, encprivkey_length):
+ post_offset = HEADER_LENGTH
+ offsets = {}
+ o1 = offsets['signature'] = post_offset + verification_key_length
+ o2 = offsets['share_hash_chain'] = o1 + signature_length
+ o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
+ assert IV_length == 16
+ o4 = offsets['IV'] = o3 + block_hash_tree_length
+ o5 = offsets['share_data'] = o4 + IV_length
+ o6 = offsets['enc_privkey'] = o5 + share_data_length
+ o7 = offsets['EOF'] = o6 + encprivkey_length
+
+ return struct.pack(">LLLLLQQ",
+ offsets['signature'],
+ offsets['share_hash_chain'],
+ offsets['block_hash_tree'],
+ offsets['IV'],
+ offsets['share_data'],
+ offsets['enc_privkey'],
+ offsets['EOF'])
+
# use client.create_mutable_file() to make one of these
class MutableFileNode:
def replace(self, newdata):
return defer.succeed(None)
-class ShareFormattingMixin:
-
- def _unpack_share(self, data):
- assert len(data) >= HEADER_LENGTH
- o = {}
- (version,
- seqnum,
- root_hash,
- k, N, segsize, datalen,
- o['signature'],
- o['share_hash_chain'],
- o['block_hash_tree'],
- o['IV'],
- o['share_data'],
- o['enc_privkey'],
- o['EOF']) = struct.unpack(">BQ32s" + "BBQQ" + "LLLLLQQ",
- data[:HEADER_LENGTH])
-
- assert version == 0
- if len(data) < o['EOF']:
- raise NeedMoreDataError(o['EOF'])
-
- pubkey = data[HEADER_LENGTH:o['signature']]
- signature = data[o['signature']:o['share_hash_chain']]
- share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']]
- share_hash_format = ">H32s"
- hsize = struct.calcsize(share_hash_format)
- assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s)
- share_hash_chain = []
- for i in range(0, len(share_hash_chain_s), hsize):
- chunk = share_hash_chain_s[i:i+hsize]
- (hid, h) = struct.unpack(share_hash_format, chunk)
- share_hash_chain.append( (hid, h) )
- block_hash_tree_s = data[o['block_hash_tree']:o['IV']]
- assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s)
- block_hash_tree = []
- for i in range(0, len(block_hash_tree_s), 32):
- block_hash_tree.append(block_hash_tree_s[i:i+32])
-
- IV = data[o['IV']:o['share_data']]
- share_data = data[o['share_data']:o['enc_privkey']]
- enc_privkey = data[o['enc_privkey']:o['EOF']]
-
- return (seqnum, root_hash, k, N, segsize, datalen,
- pubkey, signature, share_hash_chain, block_hash_tree,
- IV, share_data, enc_privkey)
-
-class Retrieve(ShareFormattingMixin):
+
+class Retrieve:
def __init__(self, filenode):
self._node = filenode
else:
self[key] = set([value])
-
-class Publish(ShareFormattingMixin):
+class Publish:
"""I represent a single act of publishing the mutable file to the grid."""
def __init__(self, filenode):
# 2: perform peer selection, get candidate servers
# 2a: send queries to n+epsilon servers, to determine current shares
# 2b: based upon responses, create target map
-
- # 3: pre-allocate some shares to some servers, based upon any existing
- # self._node._sharemap
- # 4: send allocate/testv_and_writev messages
- # 5: as responses return, update share-dispatch table
- # 5a: may need to run recovery algorithm
- # 6: when enough responses are back, we're done
+ # 3: send slot_testv_and_readv_and_writev messages
+ # 4: as responses return, update share-dispatch table
+ # 4a: may need to run recovery algorithm
+ # 5: when enough responses are back, we're done
old_roothash = self._node._current_roothash
old_seqnum = self._node._current_seqnum
d.addCallback(self._query_peers, total_shares)
d.addCallback(self._send_shares)
- d.addCallback(self._wait_for_responses)
+ d.addCallback(self._maybe_recover)
d.addCallback(lambda res: None)
return d
root_hash = share_hash_tree[0]
assert len(root_hash) == 32
- prefix = self._pack_prefix(seqnum, root_hash,
- required_shares, total_shares,
- segment_size, data_length)
+ prefix = pack_prefix(seqnum, root_hash,
+ required_shares, total_shares,
+ segment_size, data_length)
# now pack the beginning of the share. All shares are the same up
# to the signature, then they have divergent share hash chains,
assert len(h) == 32
block_hash_tree_s = "".join(bht)
share_data = all_shares[shnum]
- offsets = self._pack_offsets(len(verification_key),
- len(signature),
- len(share_hash_chain_s),
- len(block_hash_tree_s),
- len(IV),
- len(share_data),
- len(encprivkey))
+ offsets = pack_offsets(len(verification_key),
+ len(signature),
+ len(share_hash_chain_s),
+ len(block_hash_tree_s),
+ len(IV),
+ len(share_data),
+ len(encprivkey))
final_shares[shnum] = "".join([prefix,
offsets,
return (seqnum, root_hash, final_shares)
- def _pack_checkstring(self, seqnum, root_hash):
- return struct.pack(">BQ32s",
- 0, # version,
- seqnum,
- root_hash)
-
- def _pack_prefix(self, seqnum, root_hash,
- required_shares, total_shares,
- segment_size, data_length):
- prefix = struct.pack(">BQ32s" + "BBQQ",
- 0, # version,
- seqnum,
- root_hash,
-
- required_shares,
- total_shares,
- segment_size,
- data_length,
- )
- return prefix
-
- def _pack_offsets(self, verification_key_length, signature_length,
- share_hash_chain_length, block_hash_tree_length,
- IV_length, share_data_length, encprivkey_length):
- post_offset = HEADER_LENGTH
- offsets = {}
- o1 = offsets['signature'] = post_offset + verification_key_length
- o2 = offsets['share_hash_chain'] = o1 + signature_length
- o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
- assert IV_length == 16
- o4 = offsets['IV'] = o3 + block_hash_tree_length
- o5 = offsets['share_data'] = o4 + IV_length
- o6 = offsets['enc_privkey'] = o5 + share_data_length
- o7 = offsets['EOF'] = o6 + encprivkey_length
-
- return struct.pack(">LLLLLQQ",
- offsets['signature'],
- offsets['share_hash_chain'],
- offsets['block_hash_tree'],
- offsets['IV'],
- offsets['share_data'],
- offsets['enc_privkey'],
- offsets['EOF'])
-
def _query_peers(self, (seqnum, root_hash, final_shares), total_shares):
self._new_seqnum = seqnum
self._new_root_hash = root_hash
for shnum, datav in datavs.items():
assert len(datav) == 1
data = datav[0]
- r = self._unpack_share(data)
+ r = unpack_share(data)
share = (shnum, r[0], r[1]) # shnum,seqnum,R
current_share_peers[shnum].add( (peerid, r[0], r[1]) )
# surprises here are *not* indications of UncoordinatedWriteError,
# and we'll need to respond to them more gracefully.
- my_checkstring = self._pack_checkstring(self._new_seqnum,
- self._new_root_hash)
+ my_checkstring = pack_checkstring(self._new_seqnum,
+ self._new_root_hash)
peer_messages = {}
expected_old_shares = {}
dl = []
# ok, send the messages!
self._surprised = False
+ dispatch_map = DictOfSets()
for peerid, tw_vectors in peer_messages.items():
d = self._do_testreadwrite(peerid, peer_storage_servers, secrets,
tw_vectors, read_vector)
- d.addCallback(self._got_write_answer,
- peerid, expected_old_shares[peerid])
+ d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
+ peerid, expected_old_shares[peerid], dispatch_map)
dl.append(d)
d = defer.DeferredList(dl)
- d.addCallback(lambda res: self._surprised)
+ d.addCallback(lambda res: (self._surprised, dispatch_map))
return d
def _do_testreadwrite(self, peerid, peer_storage_servers, secrets,
read_vector)
return d
- def _got_write_answer(self, answer, peerid, expected_old_shares):
+ def _got_write_answer(self, answer, tw_vectors, my_checkstring,
+ peerid, expected_old_shares,
+ dispatch_map):
wrote, read_data = answer
surprised = False
+
if not wrote:
# surprise! our testv failed, so the write did not happen
surprised = True
- for shnum, (old_checkstring,) in read_data.items():
+
+ for shnum, (old_cs,) in read_data.items():
+ old_seqnum, old_root_hash = unpack_checkstring(old_cs)
+ if wrote and shnum in tw_vectors:
+ current_cs = my_checkstring
+ else:
+ current_cs = old_cs
+
+ current_seqnum, current_root_hash = unpack_checkstring(current_cs)
+ dispatch_map.add(shnum, (peerid, current_seqnum, current_root_hash))
+
if shnum not in expected_old_shares:
# surprise! there was a share we didn't know about
surprised = True
else:
seqnum, root_hash = expected_old_shares[shnum]
if seqnum is not None:
- expected_checkstring = self._pack_checkstring(seqnum,
- root_hash)
- if old_checkstring != expected_checkstring:
- # surprise! somebody modified the share
+ if seqnum != old_seqnum or root_hash != old_root_hash:
+ # surprise! somebody modified the share on us
surprised = True
if surprised:
self._surprised = True
+ def _maybe_recover(self, (surprised, dispatch_map)):
+ if not surprised:
+ return
+ print "RECOVERY NOT YET IMPLEMENTED"
+ # but dispatch_map will help us do it
+ raise UncoordinatedWriteError("I was surprised!")
+