mutable: test write failures, uncoordinated write detection
authorBrian Warner <warner@allmydata.com>
Tue, 22 Apr 2008 18:49:53 +0000 (11:49 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 22 Apr 2008 18:49:53 +0000 (11:49 -0700)
src/allmydata/mutable/common.py
src/allmydata/mutable/node.py
src/allmydata/mutable/publish.py
src/allmydata/test/test_mutable.py

index 4eeba9a64db2bdc531debfd8ab8c731be315d7b3..a6e2acb3fa965ffa16c5eddcd1257df6fef49912 100644 (file)
@@ -30,6 +30,10 @@ class UncoordinatedWriteError(Exception):
 class UnrecoverableFileError(Exception):
     pass
 
+class NotEnoughServersError(Exception):
+    """There were not enough functioning servers available to place shares
+    upon."""
+
 class CorruptShareError(Exception):
     def __init__(self, peerid, shnum, reason):
         self.args = (peerid, shnum, reason)
index 749b8483e1d7960487aa048208943c607a8229ae..275ed377402866b2ac1192461dcc08f8345dc47f 100644 (file)
@@ -101,23 +101,24 @@ class MutableFileNode:
         contents. Returns a Deferred that fires (with the MutableFileNode
         instance you should use) when it completes.
         """
-        self._required_shares, self._total_shares = self.DEFAULT_ENCODING
 
         d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator)
-        def _generated( (pubkey, privkey) ):
-            self._pubkey, self._privkey = pubkey, privkey
-            pubkey_s = self._pubkey.serialize()
-            privkey_s = self._privkey.serialize()
-            self._writekey = hashutil.ssk_writekey_hash(privkey_s)
-            self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
-            self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
-            self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
-            self._readkey = self._uri.readkey
-            self._storage_index = self._uri.storage_index
-            return self._upload(initial_contents, None)
-        d.addCallback(_generated)
+        d.addCallback(self._generated)
+        d.addCallback(lambda res: self._upload(initial_contents, None))
         return d
 
+    def _generated(self, (pubkey, privkey) ):
+        self._pubkey, self._privkey = pubkey, privkey
+        pubkey_s = self._pubkey.serialize()
+        privkey_s = self._privkey.serialize()
+        self._writekey = hashutil.ssk_writekey_hash(privkey_s)
+        self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
+        self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
+        self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
+        self._readkey = self._uri.readkey
+        self._storage_index = self._uri.storage_index
+        self._required_shares, self._total_shares = self.DEFAULT_ENCODING
+
     def _generate_pubprivkeys(self, keypair_generator):
         if keypair_generator:
             return keypair_generator(self.SIGNATURE_KEY_SIZE)
index 3ef9f8315cc3d95ca7c5006750dd1867f45f6244..1d313918681b2691270c0c65dc2faf619a251227 100644 (file)
@@ -11,7 +11,8 @@ from allmydata import hashtree, codec, storage
 from pycryptopp.cipher.aes import AES
 from foolscap.eventual import eventually
 
-from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
+from common import MODE_WRITE, DictOfSets, \
+     UncoordinatedWriteError, NotEnoughServersError
 from servermap import ServerMap
 from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
      unpack_checkstring, SIGNED_PREFIX
@@ -110,16 +111,14 @@ class Publish:
         self._status.set_progress(0.0)
         self._status.set_active(True)
 
+    def get_status(self):
+        return self._status
+
     def log(self, *args, **kwargs):
         if 'parent' not in kwargs:
             kwargs['parent'] = self._log_number
         return log.msg(*args, **kwargs)
 
-    def log_err(self, *args, **kwargs):
-        if 'parent' not in kwargs:
-            kwargs['parent'] = self._log_number
-        return log.err(*args, **kwargs)
-
     def publish(self, newdata):
         """Publish the filenode's current contents.  Returns a Deferred that
         fires (with None) when the publish has done as much work as it's ever
@@ -191,8 +190,16 @@ class Publish:
 
         self.setup_encoding_parameters()
 
+        # if we experience any surprises (writes which were rejected because
+        # our test vector did not match, or shares which we didn't expect to
+        # see), we set this flag and report an UncoordinatedWriteError at the
+        # end of the publish process.
         self.surprised = False
 
+        # as a failsafe, refuse to iterate through self.loop more than a
+        # thousand times.
+        self.looplimit = 1000
+
         # we keep track of three tables. The first is our goal: which share
         # we want to see on which servers. This is initially populated by the
         # existing servermap.
@@ -264,19 +271,28 @@ class Publish:
         self.log("entering loop", level=log.NOISY)
         if not self._running:
             return
-        self.update_goal()
-        # how far are we from our goal?
-        needed = self.goal - self.placed - self.outstanding
-        self._update_status()
 
-        if needed:
-            # we need to send out new shares
-            self.log(format="need to send %(needed)d new shares",
-                     needed=len(needed), level=log.NOISY)
-            d = self._send_shares(needed)
-            d.addCallback(self.loop)
-            d.addErrback(self._fatal_error)
-            return
+        self.looplimit -= 1
+        if self.looplimit <= 0:
+            raise RuntimeError("loop limit exceeded")
+
+        if self.surprised:
+            # don't send out any new shares, just wait for the outstanding
+            # ones to be retired.
+            self.log("currently surprised, so don't send any new shares",
+                     level=log.NOISY)
+        else:
+            self.update_goal()
+            # how far are we from our goal?
+            needed = self.goal - self.placed - self.outstanding
+            self._update_status()
+
+            if needed:
+                # we need to send out new shares
+                self.log(format="need to send %(needed)d new shares",
+                         needed=len(needed), level=log.NOISY)
+                self._send_shares(needed)
+                return
 
         if self.outstanding:
             # queries are still pending, keep waiting
@@ -293,9 +309,9 @@ class Publish:
         self._status.timings["push"] = elapsed
         return self._done(None)
 
-    def log_goal(self, goal):
-        logmsg = []
-        for (peerid, shnum) in goal:
+    def log_goal(self, goal, message=""):
+        logmsg = [message]
+        for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
             logmsg.append("sh%d to [%s]" % (shnum,
                                             idlib.shortnodeid_b2a(peerid)))
         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
@@ -303,6 +319,10 @@ class Publish:
                  level=log.NOISY)
 
     def update_goal(self):
+        # if log.recording_noisy
+        if True:
+            self.log_goal(self.goal, "before update: ")
+
         # first, remove any bad peers from our goal
         self.goal = set([ (peerid, shnum)
                           for (peerid, shnum) in self.goal
@@ -318,10 +338,6 @@ class Publish:
         if not homeless_shares:
             return
 
-        # if log.recording_noisy
-        if False:
-            self.log_goal(self.goal)
-
         # if an old share X is on a node, put the new share X there too.
         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
         #       shares from existing peers to new (less-crowded) ones. The
@@ -340,21 +356,35 @@ class Publish:
 
         peerlist = []
         for i, (peerid, ss) in enumerate(self.full_peerlist):
+            if peerid in self.bad_peers:
+                continue
             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
             peerlist.append(entry)
         peerlist.sort()
 
+        if not peerlist:
+            raise NotEnoughServersError("Ran out of non-bad servers")
+
         new_assignments = []
         # we then index this peerlist with an integer, because we may have to
         # wrap. We update the goal as we go.
         i = 0
         for shnum in homeless_shares:
             (ignored1, ignored2, peerid, ss) = peerlist[i]
+            # TODO: if we are forced to send a share to a server that already
+            # has one, we may have two write requests in flight, and the
+            # servermap (which was computed before either request was sent)
+            # won't reflect the new shares, so the second response will cause
+            # us to be surprised ("unexpected share on peer"), causing the
+            # publish to fail with an UncoordinatedWriteError. This is
+            # troublesome but not really a bit problem. Fix it at some point.
             self.goal.add( (peerid, shnum) )
             self.connections[peerid] = ss
             i += 1
             if i >= len(peerlist):
                 i = 0
+        if True:
+            self.log_goal(self.goal, "after update: ")
 
 
 
@@ -564,8 +594,8 @@ class Publish:
         read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
 
         # ok, send the messages!
+        self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
         started = time.time()
-        dl = []
         for (peerid, tw_vectors) in all_tw_vectors.items():
 
             write_enabler = self._node.get_write_enabler(peerid)
@@ -574,16 +604,19 @@ class Publish:
             secrets = (write_enabler, renew_secret, cancel_secret)
             shnums = tw_vectors.keys()
 
+            for shnum in shnums:
+                self.outstanding.add( (peerid, shnum) )
+
             d = self._do_testreadwrite(peerid, secrets,
                                        tw_vectors, read_vector)
             d.addCallbacks(self._got_write_answer, self._got_write_error,
                            callbackArgs=(peerid, shnums, started),
                            errbackArgs=(peerid, shnums, started))
+            d.addCallback(self.loop)
             d.addErrback(self._fatal_error)
-            dl.append(d)
 
         self._update_status()
-        return defer.DeferredList(dl, fireOnOneErrback=True) # just for testing
+        self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
 
     def _do_testreadwrite(self, peerid, secrets,
                           tw_vectors, read_vector):
@@ -610,7 +643,29 @@ class Publish:
 
         wrote, read_data = answer
 
+        surprise_shares = set(read_data.keys()) - set(shnums)
+        if surprise_shares:
+            self.log("they had shares %s that we didn't know about" %
+                     (list(surprise_shares),),
+                     parent=lp, level=log.WEIRD)
+            self.surprised = True
+
         if not wrote:
+            # TODO: there are two possibilities. The first is that the server
+            # is full (or just doesn't want to give us any room), which means
+            # we shouldn't ask them again, but is *not* an indication of an
+            # uncoordinated write. The second is that our testv failed, which
+            # *does* indicate an uncoordinated write. We currently don't have
+            # a way to tell these two apart (in fact, the storage server code
+            # doesn't have the option of refusing our share).
+            #
+            # If the server is full, mark the peer as bad (so we don't ask
+            # them again), but don't set self.surprised. The loop() will find
+            # a new server.
+            #
+            # If the testv failed, log it, set self.surprised, but don't
+            # bother adding to self.bad_peers .
+
             self.log("our testv failed, so the write did not happen",
                      parent=lp, level=log.WEIRD)
             self.surprised = True
@@ -623,15 +678,19 @@ class Publish:
                  other_salt) = unpack_checkstring(checkstring)
                 expected_version = self._servermap.version_on_peer(peerid,
                                                                    shnum)
-                (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
-                 offsets_tuple) = expected_version
-                self.log("somebody modified the share on us:"
-                         " shnum=%d: I thought they had #%d:R=%s,"
-                         " but testv reported #%d:R=%s" %
-                         (shnum,
-                          seqnum, base32.b2a(root_hash)[:4],
-                          other_seqnum, base32.b2a(other_roothash)[:4]),
-                         parent=lp, level=log.NOISY)
+                if expected_version:
+                    (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+                     offsets_tuple) = expected_version
+                    self.log("somebody modified the share on us:"
+                             " shnum=%d: I thought they had #%d:R=%s,"
+                             " but testv reported #%d:R=%s" %
+                             (shnum,
+                              seqnum, base32.b2a(root_hash)[:4],
+                              other_seqnum, base32.b2a(other_roothash)[:4]),
+                             parent=lp, level=log.NOISY)
+                # if expected_version==None, then we didn't expect to see a
+                # share on that peer, and the 'surprise_shares' clause above
+                # will have logged it.
             # self.loop() will take care of finding new homes
             return
 
@@ -641,14 +700,6 @@ class Publish:
             self._servermap.add_new_share(peerid, shnum,
                                           self.versioninfo, started)
 
-        surprise_shares = set(read_data.keys()) - set(shnums)
-        if surprise_shares:
-            self.log("they had shares %s that we didn't know about" %
-                     (list(surprise_shares),),
-                     parent=lp, level=log.WEIRD)
-            self.surprised = True
-            return
-
         # self.loop() will take care of checking to see if we're done
         return
 
@@ -664,28 +715,6 @@ class Publish:
         return
 
 
-
-    def _log_dispatch_map(self, dispatch_map):
-        for shnum, places in dispatch_map.items():
-            sent_to = [(idlib.shortnodeid_b2a(peerid),
-                        seqnum,
-                        base32.b2a(root_hash)[:4])
-                       for (peerid,seqnum,root_hash) in places]
-            self.log(" share %d sent to: %s" % (shnum, sent_to),
-                     level=log.NOISY)
-
-    def _maybe_recover(self, (surprised, dispatch_map)):
-        self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised,
-                 level=log.NOISY)
-        self._log_dispatch_map(dispatch_map)
-        if not surprised:
-            self.log(" no recovery needed")
-            return
-        self.log("We need recovery!", level=log.WEIRD)
-        print "RECOVERY NOT YET IMPLEMENTED"
-        # but dispatch_map will help us do it
-        raise UncoordinatedWriteError("I was surprised!")
-
     def _done(self, res):
         if not self._running:
             return
@@ -694,14 +723,18 @@ class Publish:
         self._status.timings["total"] = now - self._started
         self._status.set_active(False)
         if isinstance(res, failure.Failure):
-            self.log("Retrieve done, with failure", failure=res)
+            self.log("Publish done, with failure", failure=res, level=log.WEIRD)
             self._status.set_status("Failed")
+        elif self.surprised:
+            self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
+            self._status.set_status("UncoordinatedWriteError")
+            # deliver a failure
+            res = failure.Failure(UncoordinatedWriteError())
+            # TODO: recovery
         else:
+            self.log("Publish done, success")
             self._status.set_status("Done")
             self._status.set_progress(1.0)
         eventually(self.done_deferred.callback, res)
 
-    def get_status(self):
-        return self._status
-
 
index bc587af565a1a0233f8792322682a8803fb5d619..7832000b211e8d218e047922c5fa704168870887 100644 (file)
@@ -1,12 +1,13 @@
 
-import struct
+import os, struct
 from cStringIO import StringIO
 from twisted.trial import unittest
 from twisted.internet import defer, reactor
-from allmydata import uri, download
-from allmydata.util import base32, testutil
+from allmydata import uri, download, storage
+from allmydata.util import base32, testutil, idlib
 from allmydata.util.idlib import shortnodeid_b2a
 from allmydata.util.hashutil import tagged_hash
+from allmydata.util.fileutil import make_dirs
 from allmydata.encode import NotEnoughSharesError
 from allmydata.interfaces import IURI, IMutableFileURI, IUploadable
 from foolscap.eventual import eventually, fireEventually
@@ -1331,3 +1332,140 @@ class Exceptions(unittest.TestCase):
         ucwe = UncoordinatedWriteError()
         self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
 
+# we can't do this test with a FakeClient, since it uses FakeStorageServer
+# instances which always succeed. So we need a less-fake one.
+
+class IntentionalError(Exception):
+    pass
+
+class LocalWrapper:
+    def __init__(self, original):
+        self.original = original
+        self.broken = False
+    def callRemote(self, methname, *args, **kwargs):
+        def _call():
+            if self.broken:
+                raise IntentionalError("I was asked to break")
+            meth = getattr(self.original, "remote_" + methname)
+            return meth(*args, **kwargs)
+        d = fireEventually()
+        d.addCallback(lambda res: _call())
+        return d
+
+class LessFakeClient(FakeClient):
+
+    def __init__(self, basedir, num_peers=10):
+        self._num_peers = num_peers
+        self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
+                         for i in range(self._num_peers)]
+        self._connections = {}
+        for peerid in self._peerids:
+            peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
+            make_dirs(peerdir)
+            ss = storage.StorageServer(peerdir)
+            ss.setNodeID(peerid)
+            lw = LocalWrapper(ss)
+            self._connections[peerid] = lw
+        self.nodeid = "fakenodeid"
+
+
+class Problems(unittest.TestCase, testutil.ShouldFailMixin):
+    def test_surprise(self):
+        basedir = os.path.join("mutable/CollidingWrites/test_surprise")
+        self.client = LessFakeClient(basedir)
+        d = self.client.create_mutable_file("contents 1")
+        def _created(n):
+            d = defer.succeed(None)
+            d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
+            def _got_smap1(smap):
+                # stash the old state of the file
+                self.old_map = smap
+            d.addCallback(_got_smap1)
+            # then modify the file, leaving the old map untouched
+            d.addCallback(lambda res: log.msg("starting winning write"))
+            d.addCallback(lambda res: n.overwrite("contents 2"))
+            # now attempt to modify the file with the old servermap. This
+            # will look just like an uncoordinated write, in which every
+            # single share got updated between our mapupdate and our publish
+            d.addCallback(lambda res: log.msg("starting doomed write"))
+            d.addCallback(lambda res:
+                          self.shouldFail(UncoordinatedWriteError,
+                                          "test_surprise", None,
+                                          n.upload,
+                                          "contents 2a", self.old_map))
+            return d
+        d.addCallback(_created)
+        return d
+
+    def test_unexpected_shares(self):
+        # upload the file, take a servermap, shut down one of the servers,
+        # upload it again (causing shares to appear on a new server), then
+        # upload using the old servermap. The last upload should fail with an
+        # UncoordinatedWriteError, because of the shares that didn't appear
+        # in the servermap.
+        basedir = os.path.join("mutable/CollidingWrites/test_unexpexted_shares")
+        self.client = LessFakeClient(basedir)
+        d = self.client.create_mutable_file("contents 1")
+        def _created(n):
+            d = defer.succeed(None)
+            d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
+            def _got_smap1(smap):
+                # stash the old state of the file
+                self.old_map = smap
+                # now shut down one of the servers
+                peer0 = list(smap.make_sharemap()[0])[0]
+                self.client._connections.pop(peer0)
+                # then modify the file, leaving the old map untouched
+                log.msg("starting winning write")
+                return n.overwrite("contents 2")
+            d.addCallback(_got_smap1)
+            # now attempt to modify the file with the old servermap. This
+            # will look just like an uncoordinated write, in which every
+            # single share got updated between our mapupdate and our publish
+            d.addCallback(lambda res: log.msg("starting doomed write"))
+            d.addCallback(lambda res:
+                          self.shouldFail(UncoordinatedWriteError,
+                                          "test_surprise", None,
+                                          n.upload,
+                                          "contents 2a", self.old_map))
+            return d
+        d.addCallback(_created)
+        return d
+
+    def test_bad_server(self):
+        # Break one server, then create the file: the initial publish should
+        # complete with an alternate server. Breaking a second server should
+        # not prevent an update from succeeding either.
+        basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
+        self.client = LessFakeClient(basedir, 20)
+        # to make sure that one of the initial peers is broken, we have to
+        # get creative. We create the keys, so we can figure out the storage
+        # index, but we hold off on doing the initial publish until we've
+        # broken the server on which the first share wants to be stored.
+        n = FastMutableFileNode(self.client)
+        d = defer.succeed(None)
+        d.addCallback(n._generate_pubprivkeys)
+        d.addCallback(n._generated)
+        def _break_peer0(res):
+            si = n.get_storage_index()
+            peerlist = self.client.get_permuted_peers("storage", si)
+            peerid0, connection0 = peerlist[0]
+            peerid1, connection1 = peerlist[1]
+            connection0.broken = True
+            self.connection1 = connection1
+        d.addCallback(_break_peer0)
+        # now let the initial publish finally happen
+        d.addCallback(lambda res: n._upload("contents 1", None))
+        # that ought to work
+        d.addCallback(lambda res: n.download_best_version())
+        d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
+        # now break the second peer
+        def _break_peer1(res):
+            self.connection1.broken = True
+        d.addCallback(_break_peer1)
+        d.addCallback(lambda res: n.overwrite("contents 2"))
+        # that ought to work too
+        d.addCallback(lambda res: n.download_best_version())
+        d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+        return d
+