]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
first pass at a mutable repairer. not tested at all yet, but of course all existing...
authorBrian Warner <warner@allmydata.com>
Fri, 18 Jul 2008 04:09:23 +0000 (21:09 -0700)
committerBrian Warner <warner@allmydata.com>
Fri, 18 Jul 2008 04:09:23 +0000 (21:09 -0700)
src/allmydata/interfaces.py
src/allmydata/mutable/checker.py
src/allmydata/mutable/layout.py
src/allmydata/mutable/node.py
src/allmydata/mutable/publish.py
src/allmydata/mutable/repair.py [new file with mode: 0644]
src/allmydata/mutable/retrieve.py
src/allmydata/mutable/servermap.py
src/allmydata/test/test_mutable.py

index 85b12683310ebdb031778733b479a80e688b6cd7..deb20c383eb41d24e3d0b7ee4263f76cf24214a4 100644 (file)
@@ -1541,11 +1541,20 @@ class IDeepCheckResults(Interface):
         was not fully healthy."""
 
 class IRepairable(Interface):
-    def repair():
+    def repair(checker_results):
         """Attempt to repair the given object. Returns a Deferred that fires
         with a IRepairResults object.
+
+        I must be called with an object that implements ICheckerResults, as
+        proof that you have actually discovered a problem with this file. I
+        will use the data in the checker results to guide the repair process,
+        such as which servers provided bad data and should therefore be
+        avoided.
         """
 
+class IRepairResults(Interface):
+    """I contain the results of a repair operation."""
+
 
 class IClient(Interface):
     def upload(uploadable):
index 391a29be6c60003726dd8d26184177ccd7769718..923c82ffed176d900d45332a110c432b48b81ad6 100644 (file)
@@ -1,5 +1,4 @@
 
-import struct
 from zope.interface import implements
 from twisted.internet import defer
 from twisted.python import failure
@@ -9,7 +8,7 @@ from allmydata.interfaces import ICheckerResults
 
 from common import MODE_CHECK, CorruptShareError
 from servermap import ServerMap, ServermapUpdater
-from layout import unpack_share, SIGNED_PREFIX
+from layout import unpack_share, SIGNED_PREFIX_LENGTH
 
 class MutableChecker:
 
@@ -18,9 +17,11 @@ class MutableChecker:
         self.healthy = True
         self.problems = []
         self._storage_index = self._node.get_storage_index()
+        self.results = Results(self._storage_index)
 
     def check(self, verify=False, repair=False):
         servermap = ServerMap()
+        self.results.servermap = servermap
         self.do_verify = verify
         self.do_repair = repair
         u = ServermapUpdater(self._node, servermap, MODE_CHECK)
@@ -85,11 +86,13 @@ class MutableChecker:
             except CorruptShareError:
                 f = failure.Failure()
                 self.add_problem(shnum, peerid, f)
+                prefix = data[:SIGNED_PREFIX_LENGTH]
+                self.results.servermap.mark_bad_share(peerid, shnum, prefix)
 
     def check_prefix(self, peerid, shnum, data):
         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
          offsets_tuple) = self.best_version
-        got_prefix = data[:struct.calcsize(SIGNED_PREFIX)]
+        got_prefix = data[:SIGNED_PREFIX_LENGTH]
         if got_prefix != prefix:
             raise CorruptShareError(peerid, shnum,
                                     "prefix mismatch: share changed while we were reading it")
@@ -135,25 +138,37 @@ class MutableChecker:
             return
         if not self.do_repair:
             return
-        pass
+        self.results.repair_attempted = True
+        d = self._node.repair(self)
+        def _repair_finished(repair_results):
+            self.results.repair_succeeded = True
+            self.results.repair_results = repair_results
+        def _repair_error(f):
+            # I'm not sure if I want to pass through a failure or not.
+            self.results.repair_succeeded = False
+            self.results.repair_failure = f
+            return f
+        d.addCallbacks(_repair_finished, _repair_error)
+        return d
 
     def _return_results(self, res):
-        r = Results(self._storage_index)
-        r.healthy = self.healthy
-        r.problems = self.problems
-        return r
+        self.results.healthy = self.healthy
+        self.results.problems = self.problems
+        return self.results
 
 
     def add_problem(self, shnum, peerid, what):
         self.healthy = False
         self.problems.append( (peerid, self._storage_index, shnum, what) )
 
+
 class Results:
     implements(ICheckerResults)
 
     def __init__(self, storage_index):
         self.storage_index = storage_index
         self.storage_index_s = base32.b2a(storage_index)[:6]
+        self.repair_attempted = False
 
     def is_healthy(self):
         return self.healthy
index aaad2b27691278ffa782c6b9615d5d52bc957703..a110eb1d98d3a2127e974bae951dce5e2fa99e0d 100644 (file)
@@ -4,6 +4,7 @@ from common import NeedMoreDataError
 
 PREFIX = ">BQ32s16s" # each version has a different prefix
 SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature
+SIGNED_PREFIX_LENGTH = struct.calcsize(SIGNED_PREFIX)
 HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets
 HEADER_LENGTH = struct.calcsize(HEADER)
 
@@ -24,7 +25,7 @@ def unpack_header(data):
 
 def unpack_prefix_and_signature(data):
     assert len(data) >= HEADER_LENGTH, len(data)
-    prefix = data[:struct.calcsize(SIGNED_PREFIX)]
+    prefix = data[:SIGNED_PREFIX_LENGTH]
 
     (version,
      seqnum,
index bb038022d0812dc6eb50d7ec073f1a001c491147..2d4b20267d6f616fefa5efbac4aeef08a6b0ea95 100644 (file)
@@ -6,7 +6,8 @@ from zope.interface import implements
 from twisted.internet import defer, reactor
 from twisted.python import log
 from foolscap.eventual import eventually
-from allmydata.interfaces import IMutableFileNode, IMutableFileURI, ICheckable
+from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
+     ICheckable, ICheckerResults
 from allmydata.util import hashutil
 from allmydata.util.assertutil import precondition
 from allmydata.uri import WriteableSSKFileURI
@@ -21,6 +22,7 @@ from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
 from servermap import ServerMap, ServermapUpdater
 from retrieve import Retrieve
 from checker import MutableChecker
+from repair import Repairer
 
 
 class BackoffAgent:
@@ -186,6 +188,8 @@ class MutableFileNode:
     def get_total_shares(self):
         return self._total_shares
 
+    ####################################
+    # IFilesystemNode
 
     def get_uri(self):
         return self._uri.to_string()
@@ -237,6 +241,7 @@ class MutableFileNode:
         return d
 
     #################################
+    # ICheckable
 
     def check(self, verify=False, repair=False):
         checker = self.checker_class(self)
@@ -251,6 +256,19 @@ class MutableFileNode:
         d.addCallback(_done)
         return d
 
+    #################################
+    # IRepairable
+
+    def repair(self, checker_results):
+        assert ICheckerResults(checker_results)
+        r = Repairer(self, checker_results)
+        d = r.start()
+        return d
+
+
+    #################################
+    # IMutableFileNode
+
     # allow the use of IDownloadTarget
     def download(self, target):
         # fake it. TODO: make this cleaner.
index a8c2800309eb942096cdffac00b33e12880e58bd..af02e6310597ce9633ff0286f54952b4cd761427 100644 (file)
@@ -225,11 +225,19 @@ class Publish:
         # use later.
         self.connections = {}
 
+        self.bad_share_checkstrings = {}
+
         # we use the servermap to populate the initial goal: this way we will
         # try to update each existing share in place.
         for (peerid, shnum) in self._servermap.servermap:
             self.goal.add( (peerid, shnum) )
             self.connections[peerid] = self._servermap.connections[peerid]
+        # then we add in all the shares that were bad (corrupted, bad
+        # signatures, etc). We want to replace these.
+        for (peerid, shnum, old_checkstring) in self._servermap.bad_shares:
+            self.goal.add( (peerid, shnum) )
+            self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring
+            self.connections[peerid] = self._servermap.connections[peerid]
 
         # create the shares. We'll discard these as they are delivered. SMDF:
         # we're allowed to hold everything in memory.
@@ -560,6 +568,10 @@ class Publish:
                                                    old_salt)
                 testv = (0, len(old_checkstring), "eq", old_checkstring)
 
+            elif key in self.bad_share_checkstrings:
+                old_checkstring = self.bad_share_checkstrings[key]
+                testv = (0, len(old_checkstring), "eq", old_checkstring)
+
             else:
                 # add a testv that requires the share not exist
                 #testv = (0, 1, 'eq', "")
diff --git a/src/allmydata/mutable/repair.py b/src/allmydata/mutable/repair.py
new file mode 100644 (file)
index 0000000..f6efb34
--- /dev/null
@@ -0,0 +1,86 @@
+
+from zope.interface import implements
+from allmydata.interfaces import IRepairResults
+
+class RepairResults:
+    implements(IRepairResults)
+
+class MustForceRepairError(Exception):
+    pass
+
+class Repairer:
+    def __init__(self, node, checker_results):
+        self.node = node
+        self.checker_results = checker_results
+        assert checker_results.storage_index == self.node.get_storage_index()
+
+    def start(self, force=False):
+        # download, then re-publish. If a server had a bad share, try to
+        # replace it with a good one of the same shnum.
+
+        # The normal repair operation should not be used to replace
+        # application-specific merging of alternate versions: i.e if there
+        # are multiple highest seqnums with different roothashes. In this
+        # case, the application must use node.upload() (referencing the
+        # servermap that indicates the multiple-heads condition), or
+        # node.overwrite(). The repair() operation will refuse to run in
+        # these conditions unless a force=True argument is provided. If
+        # force=True is used, then the highest root hash will be reinforced.
+
+        # Likewise, the presence of an unrecoverable latest version is an
+        # unusual event, and should ideally be handled by retrying a couple
+        # times (spaced out over hours or days) and hoping that new shares
+        # will become available. If repair(force=True) is called, data will
+        # be lost: a new seqnum will be generated with the same contents as
+        # the most recent recoverable version, skipping over the lost
+        # version. repair(force=False) will refuse to run in a situation like
+        # this.
+
+        # Repair is designed to fix the following injuries:
+        #  missing shares: add new ones to get at least N distinct ones
+        #  old shares: replace old shares with the latest version
+        #  bogus shares (bad sigs): replace the bad one with a good one
+
+        smap = self.checker_results.servermap
+
+        if smap.unrecoverable_newer_versions():
+            if not force:
+                raise MustForceRepairError("There were unrecoverable newer "
+                                           "versions, so force=True must be "
+                                           "passed to the repair() operation")
+            # continuing on means that node.upload() will pick a seqnum that
+            # is higher than everything visible in the servermap, effectively
+            # discarding the unrecoverable versions.
+        if smap.needs_merge():
+            if not force:
+                raise MustForceRepairError("There were multiple recoverable "
+                                           "versions with identical seqnums, "
+                                           "so force=True must be passed to "
+                                           "the repair() operation")
+            # continuing on means that smap.best_recoverable_version() will
+            # pick the one with the highest roothash, and then node.upload()
+            # will replace all shares with its contents
+
+        # missing shares are handled during upload, which tries to find a
+        # home for every share
+
+        # old shares are handled during upload, which will replace any share
+        # that was present in the servermap
+
+        # bogus shares need to be managed here. We might notice a bogus share
+        # during mapupdate (whether done for a filecheck or just before a
+        # download) by virtue of it having an invalid signature. We might
+        # also notice a bad hash in the share during verify or download. In
+        # either case, the problem will be noted in the servermap, and the
+        # bad share (along with its checkstring) will be recorded in
+        # servermap.bad_shares . Publish knows that it should try and replace
+        # these.
+
+        best_version = smap.best_recoverable_version()
+        d = self.node.download_version(smap, best_version)
+        d.addCallback(self.node.upload, smap)
+        d.addCallback(self.get_results)
+        return d
+
+    def get_results(self, res):
+        return RepairResults()
index b844bcf05ae687332810c426739582c8e95f2846..c93d1d28f270130505bc7be9dd69601f5aea60a7 100644 (file)
@@ -249,7 +249,7 @@ class Retrieve:
                 f = failure.Failure()
                 self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD)
                 self.remove_peer(peerid)
-                self.servermap.mark_bad_share(peerid, shnum)
+                self.servermap.mark_bad_share(peerid, shnum, prefix)
                 self._bad_shares.add( (peerid, shnum) )
                 self._status.problems[peerid] = f
                 self._last_failure = f
index 37c29b3349a6f2cb607b80ce535c1b2b7942deb1..606a4f6baa9ba1dda07dce72e64043df6d9efe55 100644 (file)
@@ -12,7 +12,8 @@ from pycryptopp.publickey import rsa
 
 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
      DictOfSets, CorruptShareError, NeedMoreDataError
-from layout import unpack_prefix_and_signature, unpack_header, unpack_share
+from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
+     SIGNED_PREFIX_LENGTH
 
 class UpdateStatus:
     implements(IServermapUpdaterStatus)
@@ -114,25 +115,28 @@ class ServerMap:
         self.connections = {}
         self.unreachable_peers = set() # peerids that didn't respond to queries
         self.problems = [] # mostly for debugging
-        self.bad_shares = set()
+        self.bad_shares = {} # maps (peerid,shnum) to old checkstring
         self.last_update_mode = None
         self.last_update_time = 0
 
-    def mark_bad_share(self, peerid, shnum):
-        """This share was found to be bad, not in the checkstring or
-        signature, but deeper in the share, detected at retrieve time. Remove
-        it from our list of useful shares, and remember that it is bad so we
-        don't add it back again later.
+    def mark_bad_share(self, peerid, shnum, checkstring):
+        """This share was found to be bad, either in the checkstring or
+        signature (detected during mapupdate), or deeper in the share
+        (detected at retrieve time). Remove it from our list of useful
+        shares, and remember that it is bad so we don't add it back again
+        later. We record the share's old checkstring (which might be
+        corrupted or badly signed) so that a repair operation can do the
+        test-and-set using it as a reference.
         """
-        key = (peerid, shnum)
-        self.bad_shares.add(key)
+        key = (peerid, shnum) # record checkstring
+        self.bad_shares[key] = checkstring
         self.servermap.pop(key, None)
 
     def add_new_share(self, peerid, shnum, verinfo, timestamp):
         """We've written a new share out, replacing any that was there
         before."""
         key = (peerid, shnum)
-        self.bad_shares.discard(key)
+        self.bad_shares.pop(key, None)
         self.servermap[key] = (verinfo, timestamp)
 
     def dump(self, out=sys.stdout):
@@ -532,6 +536,8 @@ class ServermapUpdater:
                          parent=lp, level=log.WEIRD)
                 self._bad_peers.add(peerid)
                 self._last_failure = f
+                checkstring = data[:SIGNED_PREFIX_LENGTH]
+                self._servermap.mark_bad_share(peerid, shnum, checkstring)
                 self._servermap.problems.append(f)
                 pass
 
index 3ce59502fccf14e5e19ba26a8e23eeabd7ff459f..3fb2a61be366763d1d8fa0dcdd907016a13cff76 100644 (file)
@@ -723,7 +723,7 @@ class Servermap(unittest.TestCase):
             for (shnum, peerid, timestamp) in shares:
                 if shnum < 5:
                     self._corrupted.add( (peerid, shnum) )
-                    sm.mark_bad_share(peerid, shnum)
+                    sm.mark_bad_share(peerid, shnum, "")
             return self.update_servermap(sm, MODE_WRITE)
         d.addCallback(_made_map)
         def _check_map(sm):