]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable.Publish: create a dispatch_map for the benefit of recovery code, and pull...
authorBrian Warner <warner@allmydata.com>
Tue, 6 Nov 2007 05:14:59 +0000 (22:14 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 6 Nov 2007 05:14:59 +0000 (22:14 -0700)
src/allmydata/mutable.py
src/allmydata/test/test_mutable.py

index 4c62dc03a758fd9a3ae5bcd616873b3c0eee5bab..c9c087aac4f5f7be755b93d3494b4e3d9317cc3e 100644 (file)
@@ -10,8 +10,6 @@ from allmydata import hashtree, codec
 from allmydata.encode import NotEnoughPeersError
 
 
-HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ")
-
 class NeedMoreDataError(Exception):
     def __init__(self, needed_bytes):
         Exception.__init__(self)
@@ -20,6 +18,105 @@ class NeedMoreDataError(Exception):
 class UncoordinatedWriteError(Exception):
     pass
 
+HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ")
+
+def unpack_share(data):
+    assert len(data) >= HEADER_LENGTH
+    o = {}
+    (version,
+     seqnum,
+     root_hash,
+     k, N, segsize, datalen,
+     o['signature'],
+     o['share_hash_chain'],
+     o['block_hash_tree'],
+     o['IV'],
+     o['share_data'],
+     o['enc_privkey'],
+     o['EOF']) = struct.unpack(">BQ32s" + "BBQQ" + "LLLLLQQ",
+                                     data[:HEADER_LENGTH])
+
+    assert version == 0
+    if len(data) < o['EOF']:
+        raise NeedMoreDataError(o['EOF'])
+
+    pubkey = data[HEADER_LENGTH:o['signature']]
+    signature = data[o['signature']:o['share_hash_chain']]
+    share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']]
+    share_hash_format = ">H32s"
+    hsize = struct.calcsize(share_hash_format)
+    assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s)
+    share_hash_chain = []
+    for i in range(0, len(share_hash_chain_s), hsize):
+        chunk = share_hash_chain_s[i:i+hsize]
+        (hid, h) = struct.unpack(share_hash_format, chunk)
+        share_hash_chain.append( (hid, h) )
+    block_hash_tree_s = data[o['block_hash_tree']:o['IV']]
+    assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s)
+    block_hash_tree = []
+    for i in range(0, len(block_hash_tree_s), 32):
+        block_hash_tree.append(block_hash_tree_s[i:i+32])
+
+    IV = data[o['IV']:o['share_data']]
+    share_data = data[o['share_data']:o['enc_privkey']]
+    enc_privkey = data[o['enc_privkey']:o['EOF']]
+
+    return (seqnum, root_hash, k, N, segsize, datalen,
+            pubkey, signature, share_hash_chain, block_hash_tree,
+            IV, share_data, enc_privkey)
+
+
+def pack_checkstring(seqnum, root_hash):
+    return struct.pack(">BQ32s",
+                       0, # version,
+                       seqnum,
+                       root_hash)
+
+def unpack_checkstring(checkstring):
+    cs_len = struct.calcsize(">BQ32s")
+    version, seqnum, root_hash = struct.unpack(">BQ32s",
+                                               checkstring[:cs_len])
+    assert version == 0 # TODO: just ignore the share
+    return (seqnum, root_hash)
+
+def pack_prefix(seqnum, root_hash,
+                required_shares, total_shares,
+                segment_size, data_length):
+    prefix = struct.pack(">BQ32s" + "BBQQ",
+                         0, # version,
+                         seqnum,
+                         root_hash,
+
+                         required_shares,
+                         total_shares,
+                         segment_size,
+                         data_length,
+                         )
+    return prefix
+
+def pack_offsets(verification_key_length, signature_length,
+                 share_hash_chain_length, block_hash_tree_length,
+                 IV_length, share_data_length, encprivkey_length):
+    post_offset = HEADER_LENGTH
+    offsets = {}
+    o1 = offsets['signature'] = post_offset + verification_key_length
+    o2 = offsets['share_hash_chain'] = o1 + signature_length
+    o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
+    assert IV_length == 16
+    o4 = offsets['IV'] = o3 + block_hash_tree_length
+    o5 = offsets['share_data'] = o4 + IV_length
+    o6 = offsets['enc_privkey'] = o5 + share_data_length
+    o7 = offsets['EOF'] = o6 + encprivkey_length
+
+    return struct.pack(">LLLLLQQ",
+                       offsets['signature'],
+                       offsets['share_hash_chain'],
+                       offsets['block_hash_tree'],
+                       offsets['IV'],
+                       offsets['share_data'],
+                       offsets['enc_privkey'],
+                       offsets['EOF'])
+
 # use client.create_mutable_file() to make one of these
 
 class MutableFileNode:
@@ -109,54 +206,8 @@ class MutableFileNode:
     def replace(self, newdata):
         return defer.succeed(None)
 
-class ShareFormattingMixin:
-
-    def _unpack_share(self, data):
-        assert len(data) >= HEADER_LENGTH
-        o = {}
-        (version,
-         seqnum,
-         root_hash,
-         k, N, segsize, datalen,
-         o['signature'],
-         o['share_hash_chain'],
-         o['block_hash_tree'],
-         o['IV'],
-         o['share_data'],
-         o['enc_privkey'],
-         o['EOF']) = struct.unpack(">BQ32s" + "BBQQ" + "LLLLLQQ",
-                                         data[:HEADER_LENGTH])
-
-        assert version == 0
-        if len(data) < o['EOF']:
-            raise NeedMoreDataError(o['EOF'])
-
-        pubkey = data[HEADER_LENGTH:o['signature']]
-        signature = data[o['signature']:o['share_hash_chain']]
-        share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']]
-        share_hash_format = ">H32s"
-        hsize = struct.calcsize(share_hash_format)
-        assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s)
-        share_hash_chain = []
-        for i in range(0, len(share_hash_chain_s), hsize):
-            chunk = share_hash_chain_s[i:i+hsize]
-            (hid, h) = struct.unpack(share_hash_format, chunk)
-            share_hash_chain.append( (hid, h) )
-        block_hash_tree_s = data[o['block_hash_tree']:o['IV']]
-        assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s)
-        block_hash_tree = []
-        for i in range(0, len(block_hash_tree_s), 32):
-            block_hash_tree.append(block_hash_tree_s[i:i+32])
-
-        IV = data[o['IV']:o['share_data']]
-        share_data = data[o['share_data']:o['enc_privkey']]
-        enc_privkey = data[o['enc_privkey']:o['EOF']]
-
-        return (seqnum, root_hash, k, N, segsize, datalen,
-                pubkey, signature, share_hash_chain, block_hash_tree,
-                IV, share_data, enc_privkey)
-
-class Retrieve(ShareFormattingMixin):
+
+class Retrieve:
     def __init__(self, filenode):
         self._node = filenode
 
@@ -167,8 +218,7 @@ class DictOfSets(dict):
         else:
             self[key] = set([value])
 
-
-class Publish(ShareFormattingMixin):
+class Publish:
     """I represent a single act of publishing the mutable file to the grid."""
 
     def __init__(self, filenode):
@@ -184,13 +234,10 @@ class Publish(ShareFormattingMixin):
         # 2: perform peer selection, get candidate servers
         #  2a: send queries to n+epsilon servers, to determine current shares
         #  2b: based upon responses, create target map
-
-        # 3: pre-allocate some shares to some servers, based upon any existing
-        #    self._node._sharemap
-        # 4: send allocate/testv_and_writev messages
-        # 5: as responses return, update share-dispatch table
-        # 5a: may need to run recovery algorithm
-        # 6: when enough responses are back, we're done
+        # 3: send slot_testv_and_readv_and_writev messages
+        # 4: as responses return, update share-dispatch table
+        # 4a: may need to run recovery algorithm
+        # 5: when enough responses are back, we're done
 
         old_roothash = self._node._current_roothash
         old_seqnum = self._node._current_seqnum
@@ -209,7 +256,7 @@ class Publish(ShareFormattingMixin):
 
         d.addCallback(self._query_peers, total_shares)
         d.addCallback(self._send_shares)
-        d.addCallback(self._wait_for_responses)
+        d.addCallback(self._maybe_recover)
         d.addCallback(lambda res: None)
         return d
 
@@ -279,9 +326,9 @@ class Publish(ShareFormattingMixin):
         root_hash = share_hash_tree[0]
         assert len(root_hash) == 32
 
-        prefix = self._pack_prefix(seqnum, root_hash,
-                                   required_shares, total_shares,
-                                   segment_size, data_length)
+        prefix = pack_prefix(seqnum, root_hash,
+                             required_shares, total_shares,
+                             segment_size, data_length)
 
         # now pack the beginning of the share. All shares are the same up
         # to the signature, then they have divergent share hash chains,
@@ -303,13 +350,13 @@ class Publish(ShareFormattingMixin):
                 assert len(h) == 32
             block_hash_tree_s = "".join(bht)
             share_data = all_shares[shnum]
-            offsets = self._pack_offsets(len(verification_key),
-                                         len(signature),
-                                         len(share_hash_chain_s),
-                                         len(block_hash_tree_s),
-                                         len(IV),
-                                         len(share_data),
-                                         len(encprivkey))
+            offsets = pack_offsets(len(verification_key),
+                                   len(signature),
+                                   len(share_hash_chain_s),
+                                   len(block_hash_tree_s),
+                                   len(IV),
+                                   len(share_data),
+                                   len(encprivkey))
 
             final_shares[shnum] = "".join([prefix,
                                            offsets,
@@ -323,50 +370,6 @@ class Publish(ShareFormattingMixin):
         return (seqnum, root_hash, final_shares)
 
 
-    def _pack_checkstring(self, seqnum, root_hash):
-        return struct.pack(">BQ32s",
-                           0, # version,
-                           seqnum,
-                           root_hash)
-
-    def _pack_prefix(self, seqnum, root_hash,
-                     required_shares, total_shares,
-                     segment_size, data_length):
-        prefix = struct.pack(">BQ32s" + "BBQQ",
-                             0, # version,
-                             seqnum,
-                             root_hash,
-
-                             required_shares,
-                             total_shares,
-                             segment_size,
-                             data_length,
-                             )
-        return prefix
-
-    def _pack_offsets(self, verification_key_length, signature_length,
-                      share_hash_chain_length, block_hash_tree_length,
-                      IV_length, share_data_length, encprivkey_length):
-        post_offset = HEADER_LENGTH
-        offsets = {}
-        o1 = offsets['signature'] = post_offset + verification_key_length
-        o2 = offsets['share_hash_chain'] = o1 + signature_length
-        o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
-        assert IV_length == 16
-        o4 = offsets['IV'] = o3 + block_hash_tree_length
-        o5 = offsets['share_data'] = o4 + IV_length
-        o6 = offsets['enc_privkey'] = o5 + share_data_length
-        o7 = offsets['EOF'] = o6 + encprivkey_length
-
-        return struct.pack(">LLLLLQQ",
-                           offsets['signature'],
-                           offsets['share_hash_chain'],
-                           offsets['block_hash_tree'],
-                           offsets['IV'],
-                           offsets['share_data'],
-                           offsets['enc_privkey'],
-                           offsets['EOF'])
-
     def _query_peers(self, (seqnum, root_hash, final_shares), total_shares):
         self._new_seqnum = seqnum
         self._new_root_hash = root_hash
@@ -416,7 +419,7 @@ class Publish(ShareFormattingMixin):
         for shnum, datav in datavs.items():
             assert len(datav) == 1
             data = datav[0]
-            r = self._unpack_share(data)
+            r = unpack_share(data)
             share = (shnum, r[0], r[1]) # shnum,seqnum,R
             current_share_peers[shnum].add( (peerid, r[0], r[1]) )
 
@@ -476,8 +479,8 @@ class Publish(ShareFormattingMixin):
         # surprises here are *not* indications of UncoordinatedWriteError,
         # and we'll need to respond to them more gracefully.
 
-        my_checkstring = self._pack_checkstring(self._new_seqnum,
-                                                self._new_root_hash)
+        my_checkstring = pack_checkstring(self._new_seqnum,
+                                          self._new_root_hash)
         peer_messages = {}
         expected_old_shares = {}
 
@@ -498,6 +501,7 @@ class Publish(ShareFormattingMixin):
         dl = []
         # ok, send the messages!
         self._surprised = False
+        dispatch_map = DictOfSets()
 
         for peerid, tw_vectors in peer_messages.items():
 
@@ -508,12 +512,12 @@ class Publish(ShareFormattingMixin):
 
             d = self._do_testreadwrite(peerid, peer_storage_servers, secrets,
                                        tw_vectors, read_vector)
-            d.addCallback(self._got_write_answer,
-                          peerid, expected_old_shares[peerid])
+            d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
+                          peerid, expected_old_shares[peerid], dispatch_map)
             dl.append(d)
 
         d = defer.DeferredList(dl)
-        d.addCallback(lambda res: self._surprised)
+        d.addCallback(lambda res: (self._surprised, dispatch_map))
         return d
 
     def _do_testreadwrite(self, peerid, peer_storage_servers, secrets,
@@ -528,24 +532,42 @@ class Publish(ShareFormattingMixin):
                             read_vector)
         return d
 
-    def _got_write_answer(self, answer, peerid, expected_old_shares):
+    def _got_write_answer(self, answer, tw_vectors, my_checkstring,
+                          peerid, expected_old_shares,
+                          dispatch_map):
         wrote, read_data = answer
         surprised = False
+
         if not wrote:
             # surprise! our testv failed, so the write did not happen
             surprised = True
-        for shnum, (old_checkstring,) in read_data.items():
+
+        for shnum, (old_cs,) in read_data.items():
+            old_seqnum, old_root_hash = unpack_checkstring(old_cs)
+            if wrote and shnum in tw_vectors:
+                current_cs = my_checkstring
+            else:
+                current_cs = old_cs
+
+            current_seqnum, current_root_hash = unpack_checkstring(current_cs)
+            dispatch_map.add(shnum, (peerid, current_seqnum, current_root_hash))
+
             if shnum not in expected_old_shares:
                 # surprise! there was a share we didn't know about
                 surprised = True
             else:
                 seqnum, root_hash = expected_old_shares[shnum]
                 if seqnum is not None:
-                    expected_checkstring = self._pack_checkstring(seqnum,
-                                                                  root_hash)
-                    if old_checkstring != expected_checkstring:
-                        # surprise! somebody modified the share
+                    if seqnum != old_seqnum or root_hash != old_root_hash:
+                        # surprise! somebody modified the share on us
                         surprised = True
         if surprised:
             self._surprised = True
 
+    def _maybe_recover(self, (surprised, dispatch_map)):
+        if not surprised:
+            return
+        print "RECOVERY NOT YET IMPLEMENTED"
+        # but dispatch_map will help us do it
+        raise UncoordinatedWriteError("I was surprised!")
+
index 56f4285e711739b2ea1cfd7172648c82270ce546..373e4d305122cf68976e34543ef6c1995db8f9d7 100644 (file)
@@ -213,7 +213,7 @@ class Publish(unittest.TestCase):
                 self.failUnless(isinstance(sh, str))
                 self.failUnlessEqual(len(sh), 367)
                 # feed the share through the unpacker as a sanity-check
-                pieces = r._unpack_share(sh)
+                pieces = mutable.unpack_share(sh)
                 (u_seqnum, u_root_hash, k, N, segsize, datalen,
                  pubkey, signature, share_hash_chain, block_hash_tree,
                  IV, share_data, enc_privkey) = pieces
@@ -355,7 +355,7 @@ class Publish(unittest.TestCase):
         d, p = self.setup_for_write(20, total_shares)
         d.addCallback(p._query_peers, total_shares)
         d.addCallback(p._send_shares)
-        def _done(surprised):
+        def _done((surprised, dispatch_map)):
             self.failIf(surprised, "surprised!")
         d.addCallback(_done)
         return d