]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable WIP: if corrupt shares cause a retrieve to fail, restart it once, ignoring...
authorBrian Warner <warner@allmydata.com>
Tue, 15 Apr 2008 22:58:02 +0000 (15:58 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 15 Apr 2008 22:58:02 +0000 (15:58 -0700)
src/allmydata/encode.py
src/allmydata/mutable/node.py
src/allmydata/mutable/retrieve.py
src/allmydata/mutable/servermap.py

index e8d1a1bfc77501882905d8feffeed619cf207d8f..3480ce5fb8a458987ad54abfc3adc411ef6db0ec 100644 (file)
@@ -61,6 +61,8 @@ hash tree is put into the URI.
 """
 
 class NotEnoughPeersError(Exception):
+    worth_retrying = False
+    servermap = None
     pass
 
 class UploadAborted(Exception):
index 87295f2c8bac938baaa88487f8f6b40901f98097..3ad40470d117a69e5eefd650853905da1c38ac94 100644 (file)
@@ -7,6 +7,7 @@ from twisted.internet import defer
 from allmydata.interfaces import IMutableFileNode, IMutableFileURI
 from allmydata.util import hashutil
 from allmydata.uri import WriteableSSKFileURI
+from allmydata.encode import NotEnoughPeersError
 from pycryptopp.publickey import rsa
 from pycryptopp.cipher.aes import AES
 
@@ -279,15 +280,30 @@ class MutableFileNode:
         d.addCallback(_done)
         return d
 
-    def download_to_data(self):
-        d = self.obtain_lock()
-        d.addCallback(lambda res: self.update_servermap(mode=MODE_ENOUGH))
+    def _update_and_retrieve_best(self, old_map=None):
+        d = self.update_servermap(old_map=old_map, mode=MODE_ENOUGH)
         def _updated(smap):
             goal = smap.best_recoverable_version()
             if not goal:
                 raise UnrecoverableFileError("no recoverable versions")
             return self.download_version(smap, goal)
         d.addCallback(_updated)
+        return d
+
+    def download_to_data(self):
+        d = self.obtain_lock()
+        d.addCallback(lambda res: self._update_and_retrieve_best())
+        def _maybe_retry(f):
+            f.trap(NotEnoughPeersError)
+            e = f.value
+            if not e.worth_retrying:
+                return f
+            # the download is worth retrying once. Make sure to use the old
+            # servermap, since it is what remembers the bad shares. TODO:
+            # consider allowing this to retry multiple times.. this approach
+            # will let us tolerate about 8 bad shares, I think.
+            return self._update_and_retrieve_best(e.servermap)
+        d.addErrback(_maybe_retry)
         d.addBoth(self.release_lock)
         return d
 
index 4942d96bb86b2a5f307edd15bbf9ec3f009159ba..9abc5430b48245b4aefa0d29ca318f0a2e35b14d 100644 (file)
@@ -93,6 +93,7 @@ class Retrieve:
         self._outstanding_queries = {} # maps (peerid,shnum) to start_time
         self._running = True
         self._decoding = False
+        self._bad_shares = set()
 
         self.servermap = servermap
         assert self._node._pubkey
@@ -238,6 +239,8 @@ class Retrieve:
                 f = failure.Failure()
                 self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD)
                 self.remove_peer(peerid)
+                self.servermap.mark_bad_share(peerid, shnum)
+                self._bad_shares.add( (peerid, shnum) )
                 self._last_failure = f
                 pass
         # all done!
@@ -374,19 +377,26 @@ class Retrieve:
             format = ("ran out of peers: "
                       "have %(have)d shares (k=%(k)d), "
                       "%(outstanding)d queries in flight, "
-                      "need %(need)d more")
+                      "need %(need)d more, "
+                      "found %(bad)d bad shares")
+            args = {"have": len(self.shares),
+                    "k": k,
+                    "outstanding": len(self._outstanding_queries),
+                    "need": needed,
+                    "bad": len(self._bad_shares),
+                    }
             self.log(format=format,
-                     have=len(self.shares), k=k,
-                     outstanding=len(self._outstanding_queries),
-                     need=needed,
-                     level=log.WEIRD)
-            msg2 = format % {"have": len(self.shares),
-                             "k": k,
-                             "outstanding": len(self._outstanding_queries),
-                             "need": needed,
-                             }
-            raise NotEnoughPeersError("%s, last failure: %s" %
-                                      (msg2, self._last_failure))
+                     level=log.WEIRD, **args)
+            err = NotEnoughPeersError("%s, last failure: %s" %
+                                      (format % args, self._last_failure))
+            if self._bad_shares:
+                self.log("We found some bad shares this pass. You should "
+                         "update the servermap and try again to check "
+                         "more peers",
+                         level=log.WEIRD)
+                err.worth_retrying = True
+                err.servermap = self.servermap
+            raise err
 
         return
 
index 8863ea6e838daf1d952c7b4f6d80e365c1d78cbc..4f8fca2b97af657f14fb41f85eb3b702f9ea9260 100644 (file)
@@ -25,6 +25,14 @@ class ServerMap:
     operations, which means 'publish this new version, but only if nothing
     has changed since I last retrieved this data'. This reduces the chances
     of clobbering a simultaneous (uncoordinated) write.
+
+    @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
+                      shares that I should ignore (because a previous user of
+                      the servermap determined that they were invalid). The
+                      updater only locates a certain number of shares: if
+                      some of these turn out to have integrity problems and
+                      are unusable, the caller will need to mark those shares
+                      as bad, then re-update the servermap, then try again.
     """
 
     def __init__(self):
@@ -35,9 +43,36 @@ class ServerMap:
         self.connections = {} # maps peerid to a RemoteReference
         self.unreachable_peers = set() # peerids that didn't respond to queries
         self.problems = [] # mostly for debugging
+        self.bad_shares = set()
         self.last_update_mode = None
         self.last_update_time = 0
 
+    def mark_bad_share(self, peerid, shnum):
+        """This share was found to be bad, not in the checkstring or
+        signature, but deeper in the share, detected at retrieve time. Remove
+        it from our list of useful shares, and remember that it is bad so we
+        don't add it back again later.
+        """
+        self.bad_shares.add( (peerid, shnum) )
+        self._remove_share(peerid, shnum)
+
+    def _remove_share(self, peerid, shnum):
+        #(s_shnum, s_verinfo, s_timestamp) = share
+        to_remove = [share
+                     for share in self.servermap[peerid]
+                     if share[0] == shnum]
+        for share in to_remove:
+            self.servermap[peerid].discard(share)
+        if not self.servermap[peerid]:
+            del self.servermap[peerid]
+
+    def add_new_share(self, peerid, shnum, verinfo, timestamp):
+        """We've written a new share out, replacing any that was there
+        before."""
+        self.bad_shares.discard( (peerid, shnum) )
+        self._remove_share(peerid, shnum)
+        self.servermap.add(peerid, (shnum, verinfo, timestamp) )
+
     def dump(self, out=sys.stdout):
         print >>out, "servermap:"
         for (peerid, shares) in self.servermap.items():
@@ -148,6 +183,11 @@ class ServerMap:
 
 class ServermapUpdater:
     def __init__(self, filenode, servermap, mode=MODE_ENOUGH):
+        """I update a servermap, locating a sufficient number of useful
+        shares and remembering where they are located.
+
+        """
+
         self._node = filenode
         self._servermap = servermap
         self.mode = mode
@@ -415,6 +455,11 @@ class ServermapUpdater:
             self._valid_versions.add(verinfo)
         # We now know that this is a valid candidate verinfo.
 
+        if (peerid, shnum, verinfo) in self._servermap.bad_shares:
+            # we've been told that the rest of the data in this share is
+            # unusable, so don't add it to the servermap.
+            return verinfo
+
         # Add the info to our servermap.
         timestamp = time.time()
         self._servermap.servermap.add(peerid, (shnum, verinfo, timestamp))