From: Brian Warner Date: Tue, 26 Aug 2008 23:34:54 +0000 (-0700) Subject: mutable: make mutable-repair work for non-verifier runs, add tests X-Git-Url: https://git.rkrishnan.org/specifications/-?a=commitdiff_plain;h=1668401c16aad66ec725788b284870b30daf6a28;p=tahoe-lafs%2Ftahoe-lafs.git mutable: make mutable-repair work for non-verifier runs, add tests --- diff --git a/src/allmydata/mutable/checker.py b/src/allmydata/mutable/checker.py index ef457523..11ddeef0 100644 --- a/src/allmydata/mutable/checker.py +++ b/src/allmydata/mutable/checker.py @@ -27,9 +27,9 @@ class MutableChecker: d.addCallback(self._got_mapupdate_results) if verify: d.addCallback(self._verify_all_shares) + d.addCallback(self._generate_results) if repair: d.addCallback(self._maybe_do_repair) - d.addCallback(self._generate_results) d.addCallback(self._return_results) return d @@ -134,22 +134,6 @@ class MutableChecker: if alleged_writekey != self._node.get_writekey(): raise CorruptShareError(peerid, shnum, "invalid privkey") - def _maybe_do_repair(self, res): - if not self.need_repair: - return - self.results.repair_attempted = True - d = self._node.repair(self) - def _repair_finished(repair_results): - self.results.repair_succeeded = True - self.results.repair_results = repair_results - def _repair_error(f): - # I'm not sure if I want to pass through a failure or not. - self.results.repair_succeeded = False - self.results.repair_failure = f - return f - d.addCallbacks(_repair_finished, _repair_error) - return d - def _generate_results(self, res): self.results.healthy = True smap = self.results.servermap @@ -207,6 +191,22 @@ class MutableChecker: self.results.status_report = "\n".join(report) + "\n" + def _maybe_do_repair(self, res): + if not self.need_repair: + return + self.results.repair_attempted = True + d = self._node.repair(self.results) + def _repair_finished(repair_results): + self.results.repair_succeeded = True + self.results.repair_results = repair_results + def _repair_error(f): + # I'm not sure if I want to pass through a failure or not. + self.results.repair_succeeded = False + self.results.repair_failure = f + return f + d.addCallbacks(_repair_finished, _repair_error) + return d + def _return_results(self, res): return self.results @@ -219,6 +219,7 @@ class Results: self.storage_index_s = base32.b2a(storage_index)[:6] self.repair_attempted = False self.status_report = "[not generated yet]" # string + self.repair_report = None self.problems = [] # list of (peerid, storage_index, shnum, failure) def is_healthy(self): @@ -241,5 +242,14 @@ class Results: s += "\n" s += self.status_report s += "\n" + if self.repair_attempted: + s += "Repair attempted " + if self.repair_succeeded: + s += "and successful\n" + else: + s += "and failed\n" + s += "\n" + s += self.repair_results.to_string() + s += "\n" return s diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py index cf95ec55..b5ee826d 100644 --- a/src/allmydata/mutable/node.py +++ b/src/allmydata/mutable/node.py @@ -408,11 +408,12 @@ class MutableFileNode: self._client.notify_mapupdate(u.get_status()) return u.update() - def download_version(self, servermap, version): + def download_version(self, servermap, version, fetch_privkey=False): return self._do_serialized(self._try_once_to_download_version, - servermap, version) - def _try_once_to_download_version(self, servermap, version): - r = Retrieve(self, servermap, version) + servermap, version, fetch_privkey) + def _try_once_to_download_version(self, servermap, version, + fetch_privkey=False): + r = Retrieve(self, servermap, version, fetch_privkey) self._client.notify_retrieve(r.get_status()) return r.download() diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index d6706d54..801d2180 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -236,9 +236,10 @@ class Publish: self.connections[peerid] = self._servermap.connections[peerid] # then we add in all the shares that were bad (corrupted, bad # signatures, etc). We want to replace these. - for (peerid, shnum, old_checkstring) in self._servermap.bad_shares: - self.goal.add( (peerid, shnum) ) - self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring + for key, old_checkstring in self._servermap.bad_shares.items(): + (peerid, shnum) = key + self.goal.add(key) + self.bad_share_checkstrings[key] = old_checkstring self.connections[peerid] = self._servermap.connections[peerid] # create the shares. We'll discard these as they are delivered. SDMF: diff --git a/src/allmydata/mutable/repair.py b/src/allmydata/mutable/repair.py index f6efb347..82b8e4c7 100644 --- a/src/allmydata/mutable/repair.py +++ b/src/allmydata/mutable/repair.py @@ -5,6 +5,9 @@ from allmydata.interfaces import IRepairResults class RepairResults: implements(IRepairResults) + def to_string(self): + return "" + class MustForceRepairError(Exception): pass @@ -76,8 +79,14 @@ class Repairer: # servermap.bad_shares . Publish knows that it should try and replace # these. + # I chose to use the retrieve phase to ensure that the privkey is + # available, to avoid the extra roundtrip that would occur if we, + # say, added an smap.get_privkey() method. + + assert self.node.get_writekey() # repair currently requires a writecap + best_version = smap.best_recoverable_version() - d = self.node.download_version(smap, best_version) + d = self.node.download_version(smap, best_version, fetch_privkey=True) d.addCallback(self.node.upload, smap) d.addCallback(self.get_results) return d diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index ed4b9073..b5391eb3 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -11,6 +11,7 @@ from allmydata.util import hashutil, idlib, log from allmydata import hashtree, codec, storage from allmydata.immutable.encode import NotEnoughSharesError from pycryptopp.cipher.aes import AES +from pycryptopp.publickey import rsa from common import DictOfSets, CorruptShareError, UncoordinatedWriteError from layout import SIGNED_PREFIX, unpack_share_data @@ -81,7 +82,7 @@ class Retrieve: # Retrieve object will remain tied to a specific version of the file, and # will use a single ServerMap instance. - def __init__(self, filenode, servermap, verinfo): + def __init__(self, filenode, servermap, verinfo, fetch_privkey=False): self._node = filenode assert self._node._pubkey self._storage_index = filenode.get_storage_index() @@ -97,6 +98,12 @@ class Retrieve: self.servermap = servermap assert self._node._pubkey self.verinfo = verinfo + # during repair, we may be called upon to grab the private key, since + # it wasn't picked up during a verify=False checker run, and we'll + # need it for repair to generate the a new version. + self._need_privkey = fetch_privkey + if self._node._privkey: + self._need_privkey = False self._status = RetrieveStatus() self._status.set_storage_index(self._storage_index) @@ -166,16 +173,24 @@ class Retrieve: (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, offsets_tuple) = self.verinfo offsets = dict(offsets_tuple) + # we read the checkstring, to make sure that the data we grab is from - # the right version. We also read the data, and the hashes necessary - # to validate them (share_hash_chain, block_hash_tree, share_data). - # We don't read the signature or the pubkey, since that was handled - # during the servermap phase, and we'll be comparing the share hash - # chain against the roothash that was validated back then. - readv = [ (0, struct.calcsize(SIGNED_PREFIX)), - (offsets['share_hash_chain'], - offsets['enc_privkey'] - offsets['share_hash_chain']), - ] + # the right version. + readv = [ (0, struct.calcsize(SIGNED_PREFIX)) ] + + # We also read the data, and the hashes necessary to validate them + # (share_hash_chain, block_hash_tree, share_data). We don't read the + # signature or the pubkey, since that was handled during the + # servermap phase, and we'll be comparing the share hash chain + # against the roothash that was validated back then. + + readv.append( (offsets['share_hash_chain'], + offsets['enc_privkey'] - offsets['share_hash_chain'] ) ) + + # if we need the private key (for repair), we also fetch that + if self._need_privkey: + readv.append( (offsets['enc_privkey'], + offsets['EOF'] - offsets['enc_privkey']) ) m = Marker() self._outstanding_queries[m] = (peerid, shnum, started) @@ -243,7 +258,7 @@ class Retrieve: # shares if we get them.. seems better than an assert(). for shnum,datav in datavs.items(): - (prefix, hash_and_data) = datav + (prefix, hash_and_data) = datav[:2] try: self._got_results_one_share(shnum, peerid, prefix, hash_and_data) @@ -259,6 +274,9 @@ class Retrieve: self._status.problems[peerid] = f self._last_failure = f pass + if self._need_privkey and len(datav) > 2: + lp = None + self._try_to_validate_privkey(datav[2], peerid, shnum, lp) # all done! def _got_results_one_share(self, shnum, peerid, @@ -296,6 +314,25 @@ class Retrieve: # self.shares self.shares[shnum] = share_data + def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp): + + alleged_privkey_s = self._node._decrypt_privkey(enc_privkey) + alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) + if alleged_writekey != self._node.get_writekey(): + self.log("invalid privkey from %s shnum %d" % + (idlib.nodeid_b2a(peerid)[:8], shnum), + parent=lp, level=log.WEIRD, umid="YIw4tA") + return + + # it's good + self.log("got valid privkey from shnum %d on peerid %s" % + (shnum, idlib.shortnodeid_b2a(peerid)), + parent=lp) + privkey = rsa.create_signing_key_from_string(alleged_privkey_s) + self._node._populate_encprivkey(enc_privkey) + self._node._populate_privkey(privkey) + self._need_privkey = False + def _query_failed(self, f, marker, peerid): self.log(format="query to [%(peerid)s] failed", peerid=idlib.shortnodeid_b2a(peerid), @@ -332,6 +369,15 @@ class Retrieve: if len(self.shares) < k: # we don't have enough shares yet return self._maybe_send_more_queries(k) + if self._need_privkey: + # we got k shares, but none of them had a valid privkey. TODO: + # look further. Adding code to do this is a bit complicated, and + # I want to avoid that complication, and this should be pretty + # rare (k shares with bitflips in the enc_privkey but not in the + # data blocks). If we actually do get here, the subsequent repair + # will fail for lack of a privkey. + self.log("got k shares but still need_privkey, bummer", + level=log.WEIRD, umid="MdRHPA") # we have enough to finish. All the shares have had their hashes # checked, so if something fails at this point, we don't know how diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index 4ec40cc6..30642c74 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -101,13 +101,14 @@ class ServerMap: @ivar connections: maps peerid to a RemoteReference - @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing + @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing shares that I should ignore (because a previous user of the servermap determined that they were invalid). The updater only locates a certain number of shares: if some of these turn out to have integrity problems and are unusable, the caller will need to mark those shares as bad, then re-update the servermap, then try again. + The dict maps (peerid, shnum) tuple to old checkstring. """ def __init__(self): @@ -349,6 +350,9 @@ class ServermapUpdater: self._need_privkey = False if mode == MODE_WRITE and not self._node._privkey: self._need_privkey = True + # check+repair: repair requires the privkey, so if we didn't happen + # to ask for it during the check, we'll have problems doing the + # publish. prefix = storage.si_b2a(self._storage_index)[:5] self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)", diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index aa5846d3..90016cf9 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -1915,7 +1915,73 @@ class MutableChecker(SystemTestMixin, unittest.TestCase): shid_re = (r"Corrupt Shares:\s+%s: block hash tree failure" % self.corrupt_shareid) self.failUnless(re.search(shid_re, out), out) + d.addCallback(_got_results) + # now make sure the webapi repairer can fix it + def _do_repair(res): + url = (self.webish_url + + "uri/%s" % urllib.quote(self.node.get_uri()) + + "?t=check&verify=true&repair=true") + return getPage(url, method="POST") + d.addCallback(_do_repair) + def _got_repair_results(out): + self.failUnless("Repair attempted and successful" in out) + d.addCallback(_got_repair_results) + d.addCallback(_do_check) + def _got_postrepair_results(out): + self.failIf("Not Healthy!" in out, out) + self.failUnless("Recoverable Versions: 10*seq" in out) + d.addCallback(_got_postrepair_results) + + return d + + def test_delete_share(self): + self.basedir = self.mktemp() + d = self.set_up_nodes() + CONTENTS = "a little bit of data" + d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS)) + def _created(node): + self.node = node + si = self.node.get_storage_index() + out = self._run_cli(["debug", "find-shares", base32.b2a(si), + self.clients[1].basedir]) + files = out.split("\n") + # corrupt one of them, using the CLI debug command + f = files[0] + shnum = os.path.basename(f) + nodeid = self.clients[1].nodeid + nodeid_prefix = idlib.shortnodeid_b2a(nodeid) + self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum) + os.unlink(files[0]) + d.addCallback(_created) + # now make sure the webapi checker notices it + def _do_check(res): + url = (self.webish_url + + "uri/%s" % urllib.quote(self.node.get_uri()) + + "?t=check&verify=false") + return getPage(url, method="POST") + d.addCallback(_do_check) + def _got_results(out): + self.failUnless("Not Healthy!" in out, out) + self.failUnless("Unhealthy: best recoverable version has only 9 shares (encoding is 3-of-10)" in out, out) + self.failIf("Corrupt Shares" in out, out) d.addCallback(_got_results) + + # now make sure the webapi repairer can fix it + def _do_repair(res): + url = (self.webish_url + + "uri/%s" % urllib.quote(self.node.get_uri()) + + "?t=check&verify=false&repair=true") + return getPage(url, method="POST") + d.addCallback(_do_repair) + def _got_repair_results(out): + self.failUnless("Repair attempted and successful" in out) + d.addCallback(_got_repair_results) + d.addCallback(_do_check) + def _got_postrepair_results(out): + self.failIf("Not Healthy!" in out, out) + self.failUnless("Recoverable Versions: 10*seq" in out) + d.addCallback(_got_postrepair_results) + return d