From 278c47b9bd859343079d4e0d9718b336b624d0b1 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 9 Dec 2008 22:44:49 -0700 Subject: [PATCH] mutable publish: if we are surprised by shares that match what we would have written anyways, don't be surprised. This should fix one of the two #546 problems, in which we re-use a server and forget that we already sent them a share. --- src/allmydata/mutable/publish.py | 76 +++++++++++++++++++++++++++--- src/allmydata/test/test_mutable.py | 32 +++++++++++++ 2 files changed, 101 insertions(+), 7 deletions(-) diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index 59460c4f..345581bb 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -386,13 +386,14 @@ class Publish: i = 0 for shnum in homeless_shares: (ignored1, ignored2, peerid, ss) = peerlist[i] - # TODO: if we are forced to send a share to a server that already - # has one, we may have two write requests in flight, and the + # if we are forced to send a share to a server that already has + # one, we may have two write requests in flight, and the # servermap (which was computed before either request was sent) - # won't reflect the new shares, so the second response will cause - # us to be surprised ("unexpected share on peer"), causing the - # publish to fail with an UncoordinatedWriteError. This is - # troublesome but not really a bit problem. Fix it at some point. + # won't reflect the new shares, so the second response will be + # surprising. There is code in _got_write_answer() to tolerate + # this, otherwise it would cause the publish to fail with an + # UncoordinatedWriteError. See #546 for details of the trouble + # this used to cause. self.goal.add( (peerid, shnum) ) self.connections[peerid] = ss i += 1 @@ -483,6 +484,7 @@ class Publish: root_hash = share_hash_tree[0] assert len(root_hash) == 32 self.log("my new root_hash is %s" % base32.b2a(root_hash)) + self._new_version_info = (self._new_seqnum, root_hash, self.salt) prefix = pack_prefix(self._new_seqnum, root_hash, self.salt, self.required_shares, self.total_shares, @@ -672,7 +674,67 @@ class Publish: wrote, read_data = answer surprise_shares = set(read_data.keys()) - set(shnums) - if surprise_shares: + + surprised = False + for shnum in surprise_shares: + # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX) + checkstring = read_data[shnum][0] + their_version_info = unpack_checkstring(checkstring) + if their_version_info == self._new_version_info: + # they have the right share, somehow + + if (peerid,shnum) in self.goal: + # and we want them to have it, so we probably sent them a + # copy in an earlier write. This is ok, and avoids the + # #546 problem. + continue + + # They aren't in our goal, but they are still for the right + # version. Somebody else wrote them, and it's a convergent + # uncoordinated write. Pretend this is ok (don't be + # surprised), since I suspect there's a decent chance that + # we'll hit this in normal operation. + continue + + else: + # the new shares are of a different version + if peerid in self._servermap.reachable_peers: + # we asked them about their shares, so we had knowledge + # of what they used to have. Any surprising shares must + # have come from someone else, so UCW. + surprised = True + else: + # we didn't ask them, and now we've discovered that they + # have a share we didn't know about. This indicates that + # mapupdate should have wokred harder and asked more + # servers before concluding that it knew about them all. + + # signal UCW, but make sure to ask this peer next time, + # so we'll remember to update it if/when we retry. + surprised = True + # TODO: ask this peer next time. I don't yet have a good + # way to do this. Two insufficient possibilities are: + # + # self._servermap.add_new_share(peerid, shnum, verinfo, now) + # but that requires fetching/validating/parsing the whole + # version string, and all we have is the checkstring + # self._servermap.mark_bad_share(peerid, shnum, checkstring) + # that will make publish overwrite the share next time, + # but it won't re-query the server, and it won't make + # mapupdate search further + + # TODO later: when publish starts, do + # servermap.get_best_version(), extract the seqnum, + # subtract one, and store as highest-replaceable-seqnum. + # Then, if this surprise-because-we-didn't-ask share is + # of highest-replaceable-seqnum or lower, we're allowed + # to replace it: send out a new writev (or rather add it + # to self.goal and loop). + pass + + surprised = True + + if surprised: self.log("they had shares %s that we didn't know about" % (list(surprise_shares),), parent=lp, level=log.WEIRD, umid="un9CSQ") diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index d2cc36a6..329912bc 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -1959,6 +1959,38 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin): d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) return d + def test_bad_server_overlap(self): + # like test_bad_server, but with no extra unused servers to fall back + # upon. This means that we must re-use a server which we've already + # used. If we don't remember the fact that we sent them one share + # already, we'll mistakenly think we're experiencing an + # UncoordinatedWriteError. + + # Break one server, then create the file: the initial publish should + # complete with an alternate server. Breaking a second server should + # not prevent an update from succeeding either. + basedir = os.path.join("mutable/CollidingWrites/test_bad_server") + self.client = LessFakeClient(basedir, 10) + + peerids = sorted(self.client._connections.keys()) + self.client._connections[peerids[0]].broken = True + + d = self.client.create_mutable_file("contents 1") + def _created(n): + d = n.download_best_version() + d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) + # now break one of the remaining servers + def _break_second_server(res): + self.client._connections[peerids[1]].broken = True + d.addCallback(_break_second_server) + d.addCallback(lambda res: n.overwrite("contents 2")) + # that ought to work too + d.addCallback(lambda res: n.download_best_version()) + d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) + return d + d.addCallback(_created) + return d + def test_publish_all_servers_bad(self): # Break all servers: the publish should fail basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad") -- 2.45.2