From fade06ef4dc85becef6796d8d96f5fab4c7b8835 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@allmydata.com>
Date: Mon, 5 Nov 2007 21:29:47 -0700
Subject: [PATCH] mutable: added send-messages-to-peers code, about 70% done.
 No recovery code yet.

---
 src/allmydata/mutable.py           | 89 +++++++++++++++++++++++++++++-
 src/allmydata/test/test_mutable.py | 43 ++++++++++++++-
 2 files changed, 128 insertions(+), 4 deletions(-)

diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py
index d4fa1cbf..a77e3e98 100644
--- a/src/allmydata/mutable.py
+++ b/src/allmydata/mutable.py
@@ -307,6 +307,12 @@ class Publish(ShareFormattingMixin):
         return (seqnum, root_hash, final_shares)
 
 
+    def _pack_checkstring(self, seqnum, root_hash):
+        return struct.pack(">BQ32s",
+                           0, # version,
+                           seqnum,
+                           root_hash)
+
     def _pack_prefix(self, seqnum, root_hash,
                      required_shares, total_shares,
                      segment_size, data_length):
@@ -419,7 +425,7 @@ class Publish(ShareFormattingMixin):
                 (peerid, seqnum, R) = oldplace
                 if seqnum >= new_seqnum:
                     raise UncoordinatedWriteError()
-                target_map[shnum].add(oldplace)
+                target_map.add(shnum, oldplace)
                 shares_per_peer.add(peerid, shnum)
                 if shnum in shares_needing_homes:
                     shares_needing_homes.remove(shnum)
@@ -444,4 +450,83 @@ class Publish(ShareFormattingMixin):
 
         assert not shares_needing_homes
 
-        return target_map
+        return (target_map, peer_storage_servers)
+
+    def _send_shares(self, (target_map, peer_storage_servers) ):
+        # we're finally ready to send out our shares. If we encounter any
+        # surprises here, it's because somebody else is writing at the same
+        # time. (Note: in the future, when we remove the _query_peers() step
+        # and instead speculate about [or remember] which shares are where,
+        # surprises here are *not* indications of UncoordinatedWriteError,
+        # and we'll need to respond to them more gracefully.
+
+        my_checkstring = self._pack_checkstring(self._new_seqnum,
+                                                self._new_root_hash)
+        peer_messages = {}
+        expected_old_shares = {}
+
+        for shnum, peers in target_map.items():
+            for (peerid, old_seqnum, old_root_hash) in peers:
+                testv = [(0, len(my_checkstring), "ge", my_checkstring)]
+                new_share = self._new_shares[shnum]
+                writev = [(0, new_share)]
+                if peerid not in peer_messages:
+                    peer_messages[peerid] = {}
+                peer_messages[peerid][shnum] = (testv, writev, None)
+                if peerid not in expected_old_shares:
+                    expected_old_shares[peerid] = {}
+                expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash)
+
+        read_vector = [(0, len(my_checkstring))]
+
+        dl = []
+        # ok, send the messages!
+        self._surprised = False
+        for peerid, tw_vectors in peer_messages.items():
+            d = self._do_testreadwrite(peerid, peer_storage_servers,
+                                       tw_vectors, read_vector)
+            d.addCallback(self._got_write_answer,
+                          peerid, expected_old_shares[peerid])
+            dl.append(d)
+
+        d = defer.DeferredList(dl)
+        d.addCallback(lambda res: self._surprised)
+        return d
+
+    def _do_testreadwrite(self, peerid, peer_storage_servers,
+                          tw_vectors, read_vector):
+        conn = peer_storage_servers[peerid]
+        storage_index = self._node._uri.storage_index
+        # TOTALLY BOGUS renew/cancel secrets
+        write_enabler = hashutil.tagged_hash("WEFOO", storage_index)
+        renew_secret = hashutil.tagged_hash("renewFOO", storage_index)
+        cancel_secret = hashutil.tagged_hash("cancelFOO", storage_index)
+
+        d = conn.callRemote("slot_testv_and_readv_and_writev",
+                            storage_index,
+                            (write_enabler, renew_secret, cancel_secret),
+                            tw_vectors,
+                            read_vector)
+        return d
+
+    def _got_write_answer(self, answer, peerid, expected_old_shares):
+        wrote, read_data = answer
+        surprised = False
+        if not wrote:
+            # surprise! our testv failed, so the write did not happen
+            surprised = True
+        for shnum, (old_checkstring,) in read_data.items():
+            if shnum not in expected_old_shares:
+                # surprise! there was a share we didn't know about
+                surprised = True
+            else:
+                seqnum, root_hash = expected_old_shares[shnum]
+                if seqnum is not None:
+                    expected_checkstring = self._pack_checkstring(seqnum,
+                                                                  root_hash)
+                    if old_checkstring != expected_checkstring:
+                        # surprise! somebody modified the share
+                        surprised = True
+        if surprised:
+            self._surprised = True
+
diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py
index e8cc516f..9acc1e68 100644
--- a/src/allmydata/test/test_mutable.py
+++ b/src/allmydata/test/test_mutable.py
@@ -72,6 +72,18 @@ class FakePublish(mutable.Publish):
         shares = self._peers[peerid]
         return defer.succeed(shares)
 
+    def _do_testreadwrite(self, conn, peerid, tw_vectors, read_vector):
+        # always-pass: parrot the test vectors back to them.
+        readv = {}
+        for shnum, (testv, datav, new_length) in tw_vectors.items():
+            for (offset, length, op, specimen) in testv:
+                assert op in ("le", "eq", "ge")
+            readv[shnum] = [ specimen
+                             for (offset, length, op, specimen)
+                             in testv ]
+        answer = (True, readv)
+        return defer.succeed(answer)
+
 
 class FakeNewDirectoryNode(dirnode2.NewDirectoryNode):
     filenode_class = FakeFilenode
@@ -268,7 +280,7 @@ class Publish(unittest.TestCase):
         total_shares = 10
         d = p._query_peers( (new_seqnum, new_root_hash, new_seqnum),
                             total_shares)
-        def _done(target_map):
+        def _done( (target_map, peer_storage_servers) ):
             shares_per_peer = {}
             for shnum in target_map:
                 for (peerid, old_seqnum, old_R) in target_map[shnum]:
@@ -293,7 +305,7 @@ class Publish(unittest.TestCase):
         total_shares = 10
         d = p._query_peers( (new_seqnum, new_root_hash, new_seqnum),
                             total_shares)
-        def _done(target_map):
+        def _done( (target_map, peer_storage_servers) ):
             shares_per_peer = {}
             for shnum in target_map:
                 for (peerid, old_seqnum, old_R) in target_map[shnum]:
@@ -320,6 +332,33 @@ class Publish(unittest.TestCase):
                             total_shares)
         return d
 
+    def setup_for_write(self, num_peers, total_shares):
+        c, p = self.setup_for_sharemap(num_peers)
+        # make some fake shares
+        CONTENTS = "some initial contents"
+        shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
+        d = defer.maybeDeferred(p._generate_shares,
+                                (shares_and_ids,
+                                 3, total_shares,
+                                 21, # segsize
+                                 len(CONTENTS),
+                                 "IV"*8),
+                                3, # seqnum
+                                FakePrivKey(), "encprivkey", FakePubKey(),
+                                )
+        return d, p
+
+    def test_write(self):
+        total_shares = 10
+        d, p = self.setup_for_write(20, total_shares)
+        d.addCallback(p._query_peers, total_shares)
+        d.addCallback(p._send_shares)
+        def _done(surprised):
+            self.failIf(surprised, "surprised!")
+        d.addCallback(_done)
+        return d
+
+
 
 class FakePubKey:
     def serialize(self):
-- 
2.45.2