From: Kevan Carstensen Date: Tue, 2 Aug 2011 01:51:40 +0000 (-0700) Subject: mutable: train checker and repairer to work with MDMF mutable files X-Git-Tag: trac-5200~26 X-Git-Url: https://git.rkrishnan.org/pf/content/en/service/module-simplejson-index.html?a=commitdiff_plain;h=f80a7fdf18617abe307cd0b4fc8e96399f6bf8a5;p=tahoe-lafs%2Ftahoe-lafs.git mutable: train checker and repairer to work with MDMF mutable files --- diff --git a/src/allmydata/mutable/checker.py b/src/allmydata/mutable/checker.py index e5d3c301..3063b178 100644 --- a/src/allmydata/mutable/checker.py +++ b/src/allmydata/mutable/checker.py @@ -1,14 +1,11 @@ -from twisted.internet import defer -from twisted.python import failure -from allmydata import hashtree from allmydata.uri import from_string -from allmydata.util import hashutil, base32, idlib, log +from allmydata.util import base32, idlib, log from allmydata.check_results import CheckAndRepairResults, CheckResults from allmydata.mutable.common import MODE_CHECK, CorruptShareError from allmydata.mutable.servermap import ServerMap, ServermapUpdater -from allmydata.mutable.layout import unpack_share, SIGNED_PREFIX_LENGTH +from allmydata.mutable.retrieve import Retrieve # for verifying class MutableChecker: @@ -25,6 +22,9 @@ class MutableChecker: def check(self, verify=False, add_lease=False): servermap = ServerMap() + # Updating the servermap in MODE_CHECK will stand a good chance + # of finding all of the shares, and getting a good idea of + # recoverability, etc, without verifying. u = ServermapUpdater(self._node, self._storage_broker, self._monitor, servermap, MODE_CHECK, add_lease=add_lease) if self._history: @@ -48,10 +48,14 @@ class MutableChecker: if num_recoverable: self.best_version = servermap.best_recoverable_version() + # The file is unhealthy and needs to be repaired if: + # - There are unrecoverable versions. if servermap.unrecoverable_versions(): self.need_repair = True + # - There isn't a recoverable version. if num_recoverable != 1: self.need_repair = True + # - The best recoverable version is missing some shares. if self.best_version: available_shares = servermap.shares_available() (num_distinct_shares, k, N) = available_shares[self.best_version] @@ -62,89 +66,42 @@ class MutableChecker: def _verify_all_shares(self, servermap): # read every byte of each share + # + # This logic is going to be very nearly the same as the + # downloader. I bet we could pass the downloader a flag that + # makes it do this, and piggyback onto that instead of + # duplicating a bunch of code. + # + # Like: + # r = Retrieve(blah, blah, blah, verify=True) + # d = r.download() + # (wait, wait, wait, d.callback) + # + # Then, when it has finished, we can check the servermap (which + # we provided to Retrieve) to figure out which shares are bad, + # since the Retrieve process will have updated the servermap as + # it went along. + # + # By passing the verify=True flag to the constructor, we are + # telling the downloader a few things. + # + # 1. It needs to download all N shares, not just K shares. + # 2. It doesn't need to decrypt or decode the shares, only + # verify them. if not self.best_version: return - versionmap = servermap.make_versionmap() - shares = versionmap[self.best_version] - (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, - offsets_tuple) = self.best_version - offsets = dict(offsets_tuple) - readv = [ (0, offsets["EOF"]) ] - dl = [] - for (shnum, peerid, timestamp) in shares: - ss = servermap.connections[peerid] - d = self._do_read(ss, peerid, self._storage_index, [shnum], readv) - d.addCallback(self._got_answer, peerid, servermap) - dl.append(d) - return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True) - def _do_read(self, ss, peerid, storage_index, shnums, readv): - # isolate the callRemote to a separate method, so tests can subclass - # Publish and override it - d = ss.callRemote("slot_readv", storage_index, shnums, readv) + r = Retrieve(self._node, servermap, self.best_version, verify=True) + d = r.download() + d.addCallback(self._process_bad_shares) return d - def _got_answer(self, datavs, peerid, servermap): - for shnum,datav in datavs.items(): - data = datav[0] - try: - self._got_results_one_share(shnum, peerid, data) - except CorruptShareError: - f = failure.Failure() - self.need_repair = True - self.bad_shares.append( (peerid, shnum, f) ) - prefix = data[:SIGNED_PREFIX_LENGTH] - servermap.mark_bad_share(peerid, shnum, prefix) - ss = servermap.connections[peerid] - self.notify_server_corruption(ss, shnum, str(f.value)) - - def check_prefix(self, peerid, shnum, data): - (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, - offsets_tuple) = self.best_version - got_prefix = data[:SIGNED_PREFIX_LENGTH] - if got_prefix != prefix: - raise CorruptShareError(peerid, shnum, - "prefix mismatch: share changed while we were reading it") - - def _got_results_one_share(self, shnum, peerid, data): - self.check_prefix(peerid, shnum, data) - - # the [seqnum:signature] pieces are validated by _compare_prefix, - # which checks their signature against the pubkey known to be - # associated with this file. - (seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature, - share_hash_chain, block_hash_tree, share_data, - enc_privkey) = unpack_share(data) - - # validate [share_hash_chain,block_hash_tree,share_data] - - leaves = [hashutil.block_hash(share_data)] - t = hashtree.HashTree(leaves) - if list(t) != block_hash_tree: - raise CorruptShareError(peerid, shnum, "block hash tree failure") - share_hash_leaf = t[0] - t2 = hashtree.IncompleteHashTree(N) - # root_hash was checked by the signature - t2.set_hashes({0: root_hash}) - try: - t2.set_hashes(hashes=share_hash_chain, - leaves={shnum: share_hash_leaf}) - except (hashtree.BadHashError, hashtree.NotEnoughHashesError, - IndexError), e: - msg = "corrupt hashes: %s" % (e,) - raise CorruptShareError(peerid, shnum, msg) - - # validate enc_privkey: only possible if we have a write-cap - if not self._node.is_readonly(): - alleged_privkey_s = self._node._decrypt_privkey(enc_privkey) - alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) - if alleged_writekey != self._node.get_writekey(): - raise CorruptShareError(peerid, shnum, "invalid privkey") + def _process_bad_shares(self, bad_shares): + if bad_shares: + self.need_repair = True + self.bad_shares = bad_shares - def notify_server_corruption(self, ss, shnum, reason): - ss.callRemoteOnly("advise_corrupt_share", - "mutable", self._storage_index, shnum, reason) def _count_shares(self, smap, version): available_shares = smap.shares_available() diff --git a/src/allmydata/mutable/repairer.py b/src/allmydata/mutable/repairer.py index 5cef5e7d..d0bfeffe 100644 --- a/src/allmydata/mutable/repairer.py +++ b/src/allmydata/mutable/repairer.py @@ -2,6 +2,7 @@ from zope.interface import implements from twisted.internet import defer from allmydata.interfaces import IRepairResults, ICheckResults +from allmydata.mutable.publish import MutableData class RepairResults: implements(IRepairResults) @@ -104,6 +105,8 @@ class Repairer: raise RepairRequiresWritecapError("Sorry, repair currently requires a writecap, to set the write-enabler properly.") d = self.node.download_version(smap, best_version, fetch_privkey=True) + d.addCallback(lambda data: + MutableData(data)) d.addCallback(self.node.upload, smap) d.addCallback(self.get_results, smap) return d