mutable publish: if we are surprised by shares that match what we would have written...
authorBrian Warner <warner@allmydata.com>
Wed, 10 Dec 2008 05:44:49 +0000 (22:44 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 10 Dec 2008 05:44:49 +0000 (22:44 -0700)
src/allmydata/mutable/publish.py
src/allmydata/test/test_mutable.py

index 59460c4f8a9a284c614d986b5bb98c3c15335df1..345581bbf244b2cbe82ad85064edade22efa208d 100644 (file)
@@ -386,13 +386,14 @@ class Publish:
         i = 0
         for shnum in homeless_shares:
             (ignored1, ignored2, peerid, ss) = peerlist[i]
-            # TODO: if we are forced to send a share to a server that already
-            # has one, we may have two write requests in flight, and the
+            # if we are forced to send a share to a server that already has
+            # one, we may have two write requests in flight, and the
             # servermap (which was computed before either request was sent)
-            # won't reflect the new shares, so the second response will cause
-            # us to be surprised ("unexpected share on peer"), causing the
-            # publish to fail with an UncoordinatedWriteError. This is
-            # troublesome but not really a bit problem. Fix it at some point.
+            # won't reflect the new shares, so the second response will be
+            # surprising. There is code in _got_write_answer() to tolerate
+            # this, otherwise it would cause the publish to fail with an
+            # UncoordinatedWriteError. See #546 for details of the trouble
+            # this used to cause.
             self.goal.add( (peerid, shnum) )
             self.connections[peerid] = ss
             i += 1
@@ -483,6 +484,7 @@ class Publish:
         root_hash = share_hash_tree[0]
         assert len(root_hash) == 32
         self.log("my new root_hash is %s" % base32.b2a(root_hash))
+        self._new_version_info = (self._new_seqnum, root_hash, self.salt)
 
         prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
                              self.required_shares, self.total_shares,
@@ -672,7 +674,67 @@ class Publish:
         wrote, read_data = answer
 
         surprise_shares = set(read_data.keys()) - set(shnums)
-        if surprise_shares:
+
+        surprised = False
+        for shnum in surprise_shares:
+            # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
+            checkstring = read_data[shnum][0]
+            their_version_info = unpack_checkstring(checkstring)
+            if their_version_info == self._new_version_info:
+                # they have the right share, somehow
+
+                if (peerid,shnum) in self.goal:
+                    # and we want them to have it, so we probably sent them a
+                    # copy in an earlier write. This is ok, and avoids the
+                    # #546 problem.
+                    continue
+
+                # They aren't in our goal, but they are still for the right
+                # version. Somebody else wrote them, and it's a convergent
+                # uncoordinated write. Pretend this is ok (don't be
+                # surprised), since I suspect there's a decent chance that
+                # we'll hit this in normal operation.
+                continue
+
+            else:
+                # the new shares are of a different version
+                if peerid in self._servermap.reachable_peers:
+                    # we asked them about their shares, so we had knowledge
+                    # of what they used to have. Any surprising shares must
+                    # have come from someone else, so UCW.
+                    surprised = True
+                else:
+                    # we didn't ask them, and now we've discovered that they
+                    # have a share we didn't know about. This indicates that
+                    # mapupdate should have wokred harder and asked more
+                    # servers before concluding that it knew about them all.
+
+                    # signal UCW, but make sure to ask this peer next time,
+                    # so we'll remember to update it if/when we retry.
+                    surprised = True
+                    # TODO: ask this peer next time. I don't yet have a good
+                    # way to do this. Two insufficient possibilities are:
+                    #
+                    # self._servermap.add_new_share(peerid, shnum, verinfo, now)
+                    #  but that requires fetching/validating/parsing the whole
+                    #  version string, and all we have is the checkstring
+                    # self._servermap.mark_bad_share(peerid, shnum, checkstring)
+                    #  that will make publish overwrite the share next time,
+                    #  but it won't re-query the server, and it won't make
+                    #  mapupdate search further
+
+                    # TODO later: when publish starts, do
+                    # servermap.get_best_version(), extract the seqnum,
+                    # subtract one, and store as highest-replaceable-seqnum.
+                    # Then, if this surprise-because-we-didn't-ask share is
+                    # of highest-replaceable-seqnum or lower, we're allowed
+                    # to replace it: send out a new writev (or rather add it
+                    # to self.goal and loop).
+                    pass
+
+                surprised = True
+
+        if surprised:
             self.log("they had shares %s that we didn't know about" %
                      (list(surprise_shares),),
                      parent=lp, level=log.WEIRD, umid="un9CSQ")
index d2cc36a63350c8febaf83d8a3e4d67629aef5da3..329912bc58020546dfcda976c55d7c2531c52666 100644 (file)
@@ -1959,6 +1959,38 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
         return d
 
+    def test_bad_server_overlap(self):
+        # like test_bad_server, but with no extra unused servers to fall back
+        # upon. This means that we must re-use a server which we've already
+        # used. If we don't remember the fact that we sent them one share
+        # already, we'll mistakenly think we're experiencing an
+        # UncoordinatedWriteError.
+
+        # Break one server, then create the file: the initial publish should
+        # complete with an alternate server. Breaking a second server should
+        # not prevent an update from succeeding either.
+        basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
+        self.client = LessFakeClient(basedir, 10)
+
+        peerids = sorted(self.client._connections.keys())
+        self.client._connections[peerids[0]].broken = True
+
+        d = self.client.create_mutable_file("contents 1")
+        def _created(n):
+            d = n.download_best_version()
+            d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
+            # now break one of the remaining servers
+            def _break_second_server(res):
+                self.client._connections[peerids[1]].broken = True
+            d.addCallback(_break_second_server)
+            d.addCallback(lambda res: n.overwrite("contents 2"))
+            # that ought to work too
+            d.addCallback(lambda res: n.download_best_version())
+            d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+            return d
+        d.addCallback(_created)
+        return d
+
     def test_publish_all_servers_bad(self):
         # Break all servers: the publish should fail
         basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")