From: Brian Warner Date: Fri, 18 Jul 2008 04:09:23 +0000 (-0700) Subject: first pass at a mutable repairer. not tested at all yet, but of course all existing... X-Git-Url: https://git.rkrishnan.org/pf/content/en/footer/index.html?a=commitdiff_plain;h=879fefe5f341b71196fc0d5a614d73aab65122e3;p=tahoe-lafs%2Ftahoe-lafs.git first pass at a mutable repairer. not tested at all yet, but of course all existing tests pass --- diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 85b12683..deb20c38 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1541,11 +1541,20 @@ class IDeepCheckResults(Interface): was not fully healthy.""" class IRepairable(Interface): - def repair(): + def repair(checker_results): """Attempt to repair the given object. Returns a Deferred that fires with a IRepairResults object. + + I must be called with an object that implements ICheckerResults, as + proof that you have actually discovered a problem with this file. I + will use the data in the checker results to guide the repair process, + such as which servers provided bad data and should therefore be + avoided. """ +class IRepairResults(Interface): + """I contain the results of a repair operation.""" + class IClient(Interface): def upload(uploadable): diff --git a/src/allmydata/mutable/checker.py b/src/allmydata/mutable/checker.py index 391a29be..923c82ff 100644 --- a/src/allmydata/mutable/checker.py +++ b/src/allmydata/mutable/checker.py @@ -1,5 +1,4 @@ -import struct from zope.interface import implements from twisted.internet import defer from twisted.python import failure @@ -9,7 +8,7 @@ from allmydata.interfaces import ICheckerResults from common import MODE_CHECK, CorruptShareError from servermap import ServerMap, ServermapUpdater -from layout import unpack_share, SIGNED_PREFIX +from layout import unpack_share, SIGNED_PREFIX_LENGTH class MutableChecker: @@ -18,9 +17,11 @@ class MutableChecker: self.healthy = True self.problems = [] self._storage_index = self._node.get_storage_index() + self.results = Results(self._storage_index) def check(self, verify=False, repair=False): servermap = ServerMap() + self.results.servermap = servermap self.do_verify = verify self.do_repair = repair u = ServermapUpdater(self._node, servermap, MODE_CHECK) @@ -85,11 +86,13 @@ class MutableChecker: except CorruptShareError: f = failure.Failure() self.add_problem(shnum, peerid, f) + prefix = data[:SIGNED_PREFIX_LENGTH] + self.results.servermap.mark_bad_share(peerid, shnum, prefix) def check_prefix(self, peerid, shnum, data): (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, offsets_tuple) = self.best_version - got_prefix = data[:struct.calcsize(SIGNED_PREFIX)] + got_prefix = data[:SIGNED_PREFIX_LENGTH] if got_prefix != prefix: raise CorruptShareError(peerid, shnum, "prefix mismatch: share changed while we were reading it") @@ -135,25 +138,37 @@ class MutableChecker: return if not self.do_repair: return - pass + self.results.repair_attempted = True + d = self._node.repair(self) + def _repair_finished(repair_results): + self.results.repair_succeeded = True + self.results.repair_results = repair_results + def _repair_error(f): + # I'm not sure if I want to pass through a failure or not. + self.results.repair_succeeded = False + self.results.repair_failure = f + return f + d.addCallbacks(_repair_finished, _repair_error) + return d def _return_results(self, res): - r = Results(self._storage_index) - r.healthy = self.healthy - r.problems = self.problems - return r + self.results.healthy = self.healthy + self.results.problems = self.problems + return self.results def add_problem(self, shnum, peerid, what): self.healthy = False self.problems.append( (peerid, self._storage_index, shnum, what) ) + class Results: implements(ICheckerResults) def __init__(self, storage_index): self.storage_index = storage_index self.storage_index_s = base32.b2a(storage_index)[:6] + self.repair_attempted = False def is_healthy(self): return self.healthy diff --git a/src/allmydata/mutable/layout.py b/src/allmydata/mutable/layout.py index aaad2b27..a110eb1d 100644 --- a/src/allmydata/mutable/layout.py +++ b/src/allmydata/mutable/layout.py @@ -4,6 +4,7 @@ from common import NeedMoreDataError PREFIX = ">BQ32s16s" # each version has a different prefix SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature +SIGNED_PREFIX_LENGTH = struct.calcsize(SIGNED_PREFIX) HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets HEADER_LENGTH = struct.calcsize(HEADER) @@ -24,7 +25,7 @@ def unpack_header(data): def unpack_prefix_and_signature(data): assert len(data) >= HEADER_LENGTH, len(data) - prefix = data[:struct.calcsize(SIGNED_PREFIX)] + prefix = data[:SIGNED_PREFIX_LENGTH] (version, seqnum, diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py index bb038022..2d4b2026 100644 --- a/src/allmydata/mutable/node.py +++ b/src/allmydata/mutable/node.py @@ -6,7 +6,8 @@ from zope.interface import implements from twisted.internet import defer, reactor from twisted.python import log from foolscap.eventual import eventually -from allmydata.interfaces import IMutableFileNode, IMutableFileURI, ICheckable +from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \ + ICheckable, ICheckerResults from allmydata.util import hashutil from allmydata.util.assertutil import precondition from allmydata.uri import WriteableSSKFileURI @@ -21,6 +22,7 @@ from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \ from servermap import ServerMap, ServermapUpdater from retrieve import Retrieve from checker import MutableChecker +from repair import Repairer class BackoffAgent: @@ -186,6 +188,8 @@ class MutableFileNode: def get_total_shares(self): return self._total_shares + #################################### + # IFilesystemNode def get_uri(self): return self._uri.to_string() @@ -237,6 +241,7 @@ class MutableFileNode: return d ################################# + # ICheckable def check(self, verify=False, repair=False): checker = self.checker_class(self) @@ -251,6 +256,19 @@ class MutableFileNode: d.addCallback(_done) return d + ################################# + # IRepairable + + def repair(self, checker_results): + assert ICheckerResults(checker_results) + r = Repairer(self, checker_results) + d = r.start() + return d + + + ################################# + # IMutableFileNode + # allow the use of IDownloadTarget def download(self, target): # fake it. TODO: make this cleaner. diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index a8c28003..af02e631 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -225,11 +225,19 @@ class Publish: # use later. self.connections = {} + self.bad_share_checkstrings = {} + # we use the servermap to populate the initial goal: this way we will # try to update each existing share in place. for (peerid, shnum) in self._servermap.servermap: self.goal.add( (peerid, shnum) ) self.connections[peerid] = self._servermap.connections[peerid] + # then we add in all the shares that were bad (corrupted, bad + # signatures, etc). We want to replace these. + for (peerid, shnum, old_checkstring) in self._servermap.bad_shares: + self.goal.add( (peerid, shnum) ) + self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring + self.connections[peerid] = self._servermap.connections[peerid] # create the shares. We'll discard these as they are delivered. SMDF: # we're allowed to hold everything in memory. @@ -560,6 +568,10 @@ class Publish: old_salt) testv = (0, len(old_checkstring), "eq", old_checkstring) + elif key in self.bad_share_checkstrings: + old_checkstring = self.bad_share_checkstrings[key] + testv = (0, len(old_checkstring), "eq", old_checkstring) + else: # add a testv that requires the share not exist #testv = (0, 1, 'eq', "") diff --git a/src/allmydata/mutable/repair.py b/src/allmydata/mutable/repair.py new file mode 100644 index 00000000..f6efb347 --- /dev/null +++ b/src/allmydata/mutable/repair.py @@ -0,0 +1,86 @@ + +from zope.interface import implements +from allmydata.interfaces import IRepairResults + +class RepairResults: + implements(IRepairResults) + +class MustForceRepairError(Exception): + pass + +class Repairer: + def __init__(self, node, checker_results): + self.node = node + self.checker_results = checker_results + assert checker_results.storage_index == self.node.get_storage_index() + + def start(self, force=False): + # download, then re-publish. If a server had a bad share, try to + # replace it with a good one of the same shnum. + + # The normal repair operation should not be used to replace + # application-specific merging of alternate versions: i.e if there + # are multiple highest seqnums with different roothashes. In this + # case, the application must use node.upload() (referencing the + # servermap that indicates the multiple-heads condition), or + # node.overwrite(). The repair() operation will refuse to run in + # these conditions unless a force=True argument is provided. If + # force=True is used, then the highest root hash will be reinforced. + + # Likewise, the presence of an unrecoverable latest version is an + # unusual event, and should ideally be handled by retrying a couple + # times (spaced out over hours or days) and hoping that new shares + # will become available. If repair(force=True) is called, data will + # be lost: a new seqnum will be generated with the same contents as + # the most recent recoverable version, skipping over the lost + # version. repair(force=False) will refuse to run in a situation like + # this. + + # Repair is designed to fix the following injuries: + # missing shares: add new ones to get at least N distinct ones + # old shares: replace old shares with the latest version + # bogus shares (bad sigs): replace the bad one with a good one + + smap = self.checker_results.servermap + + if smap.unrecoverable_newer_versions(): + if not force: + raise MustForceRepairError("There were unrecoverable newer " + "versions, so force=True must be " + "passed to the repair() operation") + # continuing on means that node.upload() will pick a seqnum that + # is higher than everything visible in the servermap, effectively + # discarding the unrecoverable versions. + if smap.needs_merge(): + if not force: + raise MustForceRepairError("There were multiple recoverable " + "versions with identical seqnums, " + "so force=True must be passed to " + "the repair() operation") + # continuing on means that smap.best_recoverable_version() will + # pick the one with the highest roothash, and then node.upload() + # will replace all shares with its contents + + # missing shares are handled during upload, which tries to find a + # home for every share + + # old shares are handled during upload, which will replace any share + # that was present in the servermap + + # bogus shares need to be managed here. We might notice a bogus share + # during mapupdate (whether done for a filecheck or just before a + # download) by virtue of it having an invalid signature. We might + # also notice a bad hash in the share during verify or download. In + # either case, the problem will be noted in the servermap, and the + # bad share (along with its checkstring) will be recorded in + # servermap.bad_shares . Publish knows that it should try and replace + # these. + + best_version = smap.best_recoverable_version() + d = self.node.download_version(smap, best_version) + d.addCallback(self.node.upload, smap) + d.addCallback(self.get_results) + return d + + def get_results(self, res): + return RepairResults() diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index b844bcf0..c93d1d28 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -249,7 +249,7 @@ class Retrieve: f = failure.Failure() self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD) self.remove_peer(peerid) - self.servermap.mark_bad_share(peerid, shnum) + self.servermap.mark_bad_share(peerid, shnum, prefix) self._bad_shares.add( (peerid, shnum) ) self._status.problems[peerid] = f self._last_failure = f diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index 37c29b33..606a4f6b 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -12,7 +12,8 @@ from pycryptopp.publickey import rsa from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \ DictOfSets, CorruptShareError, NeedMoreDataError -from layout import unpack_prefix_and_signature, unpack_header, unpack_share +from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \ + SIGNED_PREFIX_LENGTH class UpdateStatus: implements(IServermapUpdaterStatus) @@ -114,25 +115,28 @@ class ServerMap: self.connections = {} self.unreachable_peers = set() # peerids that didn't respond to queries self.problems = [] # mostly for debugging - self.bad_shares = set() + self.bad_shares = {} # maps (peerid,shnum) to old checkstring self.last_update_mode = None self.last_update_time = 0 - def mark_bad_share(self, peerid, shnum): - """This share was found to be bad, not in the checkstring or - signature, but deeper in the share, detected at retrieve time. Remove - it from our list of useful shares, and remember that it is bad so we - don't add it back again later. + def mark_bad_share(self, peerid, shnum, checkstring): + """This share was found to be bad, either in the checkstring or + signature (detected during mapupdate), or deeper in the share + (detected at retrieve time). Remove it from our list of useful + shares, and remember that it is bad so we don't add it back again + later. We record the share's old checkstring (which might be + corrupted or badly signed) so that a repair operation can do the + test-and-set using it as a reference. """ - key = (peerid, shnum) - self.bad_shares.add(key) + key = (peerid, shnum) # record checkstring + self.bad_shares[key] = checkstring self.servermap.pop(key, None) def add_new_share(self, peerid, shnum, verinfo, timestamp): """We've written a new share out, replacing any that was there before.""" key = (peerid, shnum) - self.bad_shares.discard(key) + self.bad_shares.pop(key, None) self.servermap[key] = (verinfo, timestamp) def dump(self, out=sys.stdout): @@ -532,6 +536,8 @@ class ServermapUpdater: parent=lp, level=log.WEIRD) self._bad_peers.add(peerid) self._last_failure = f + checkstring = data[:SIGNED_PREFIX_LENGTH] + self._servermap.mark_bad_share(peerid, shnum, checkstring) self._servermap.problems.append(f) pass diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 3ce59502..3fb2a61b 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -723,7 +723,7 @@ class Servermap(unittest.TestCase): for (shnum, peerid, timestamp) in shares: if shnum < 5: self._corrupted.add( (peerid, shnum) ) - sm.mark_bad_share(peerid, shnum) + sm.mark_bad_share(peerid, shnum, "") return self.update_servermap(sm, MODE_WRITE) d.addCallback(_made_map) def _check_map(sm):