mutable: make mutable-repair work for non-verifier runs, add tests
authorBrian Warner <warner@allmydata.com>
Tue, 26 Aug 2008 23:34:54 +0000 (16:34 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 26 Aug 2008 23:34:54 +0000 (16:34 -0700)
src/allmydata/mutable/checker.py
src/allmydata/mutable/node.py
src/allmydata/mutable/publish.py
src/allmydata/mutable/repair.py
src/allmydata/mutable/retrieve.py
src/allmydata/mutable/servermap.py
src/allmydata/test/test_system.py

index ef4575230371c9c1b53b8a22cf9f230884243264..11ddeef05589769519327a20dea30d8bff2a3bbc 100644 (file)
@@ -27,9 +27,9 @@ class MutableChecker:
         d.addCallback(self._got_mapupdate_results)
         if verify:
             d.addCallback(self._verify_all_shares)
+        d.addCallback(self._generate_results)
         if repair:
             d.addCallback(self._maybe_do_repair)
-        d.addCallback(self._generate_results)
         d.addCallback(self._return_results)
         return d
 
@@ -134,22 +134,6 @@ class MutableChecker:
             if alleged_writekey != self._node.get_writekey():
                 raise CorruptShareError(peerid, shnum, "invalid privkey")
 
-    def _maybe_do_repair(self, res):
-        if not self.need_repair:
-            return
-        self.results.repair_attempted = True
-        d = self._node.repair(self)
-        def _repair_finished(repair_results):
-            self.results.repair_succeeded = True
-            self.results.repair_results = repair_results
-        def _repair_error(f):
-            # I'm not sure if I want to pass through a failure or not.
-            self.results.repair_succeeded = False
-            self.results.repair_failure = f
-            return f
-        d.addCallbacks(_repair_finished, _repair_error)
-        return d
-
     def _generate_results(self, res):
         self.results.healthy = True
         smap = self.results.servermap
@@ -207,6 +191,22 @@ class MutableChecker:
 
         self.results.status_report = "\n".join(report) + "\n"
 
+    def _maybe_do_repair(self, res):
+        if not self.need_repair:
+            return
+        self.results.repair_attempted = True
+        d = self._node.repair(self.results)
+        def _repair_finished(repair_results):
+            self.results.repair_succeeded = True
+            self.results.repair_results = repair_results
+        def _repair_error(f):
+            # I'm not sure if I want to pass through a failure or not.
+            self.results.repair_succeeded = False
+            self.results.repair_failure = f
+            return f
+        d.addCallbacks(_repair_finished, _repair_error)
+        return d
+
     def _return_results(self, res):
         return self.results
 
@@ -219,6 +219,7 @@ class Results:
         self.storage_index_s = base32.b2a(storage_index)[:6]
         self.repair_attempted = False
         self.status_report = "[not generated yet]" # string
+        self.repair_report = None
         self.problems = [] # list of (peerid, storage_index, shnum, failure)
 
     def is_healthy(self):
@@ -241,5 +242,14 @@ class Results:
         s += "\n"
         s += self.status_report
         s += "\n"
+        if self.repair_attempted:
+            s += "Repair attempted "
+            if self.repair_succeeded:
+                s += "and successful\n"
+            else:
+                s += "and failed\n"
+            s += "\n"
+            s += self.repair_results.to_string()
+            s += "\n"
         return s
 
index cf95ec55fcd468771b9e2c2fff90e4b64cacaa01..b5ee826d6af84ce46cf47bad71c297b6954b322e 100644 (file)
@@ -408,11 +408,12 @@ class MutableFileNode:
         self._client.notify_mapupdate(u.get_status())
         return u.update()
 
-    def download_version(self, servermap, version):
+    def download_version(self, servermap, version, fetch_privkey=False):
         return self._do_serialized(self._try_once_to_download_version,
-                                   servermap, version)
-    def _try_once_to_download_version(self, servermap, version):
-        r = Retrieve(self, servermap, version)
+                                   servermap, version, fetch_privkey)
+    def _try_once_to_download_version(self, servermap, version,
+                                      fetch_privkey=False):
+        r = Retrieve(self, servermap, version, fetch_privkey)
         self._client.notify_retrieve(r.get_status())
         return r.download()
 
index d6706d54e01594fb51460bfa0f342238272b4021..801d2180fd6b0911da225658d07ca13a5969a8a4 100644 (file)
@@ -236,9 +236,10 @@ class Publish:
             self.connections[peerid] = self._servermap.connections[peerid]
         # then we add in all the shares that were bad (corrupted, bad
         # signatures, etc). We want to replace these.
-        for (peerid, shnum, old_checkstring) in self._servermap.bad_shares:
-            self.goal.add( (peerid, shnum) )
-            self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring
+        for key, old_checkstring in self._servermap.bad_shares.items():
+            (peerid, shnum) = key
+            self.goal.add(key)
+            self.bad_share_checkstrings[key] = old_checkstring
             self.connections[peerid] = self._servermap.connections[peerid]
 
         # create the shares. We'll discard these as they are delivered. SDMF:
index f6efb347a64dd6f4a357ceb376cdae06d0b4349e..82b8e4c7398b5d8cd5c547606c00df614da98313 100644 (file)
@@ -5,6 +5,9 @@ from allmydata.interfaces import IRepairResults
 class RepairResults:
     implements(IRepairResults)
 
+    def to_string(self):
+        return ""
+
 class MustForceRepairError(Exception):
     pass
 
@@ -76,8 +79,14 @@ class Repairer:
         # servermap.bad_shares . Publish knows that it should try and replace
         # these.
 
+        # I chose to use the retrieve phase to ensure that the privkey is
+        # available, to avoid the extra roundtrip that would occur if we,
+        # say, added an smap.get_privkey() method.
+
+        assert self.node.get_writekey() # repair currently requires a writecap
+
         best_version = smap.best_recoverable_version()
-        d = self.node.download_version(smap, best_version)
+        d = self.node.download_version(smap, best_version, fetch_privkey=True)
         d.addCallback(self.node.upload, smap)
         d.addCallback(self.get_results)
         return d
index ed4b907340ebdea3ebc569a61dc9f9efb5b8de4e..b5391eb338145d057f63303cbf9c4a13f6d80b2a 100644 (file)
@@ -11,6 +11,7 @@ from allmydata.util import hashutil, idlib, log
 from allmydata import hashtree, codec, storage
 from allmydata.immutable.encode import NotEnoughSharesError
 from pycryptopp.cipher.aes import AES
+from pycryptopp.publickey import rsa
 
 from common import DictOfSets, CorruptShareError, UncoordinatedWriteError
 from layout import SIGNED_PREFIX, unpack_share_data
@@ -81,7 +82,7 @@ class Retrieve:
     # Retrieve object will remain tied to a specific version of the file, and
     # will use a single ServerMap instance.
 
-    def __init__(self, filenode, servermap, verinfo):
+    def __init__(self, filenode, servermap, verinfo, fetch_privkey=False):
         self._node = filenode
         assert self._node._pubkey
         self._storage_index = filenode.get_storage_index()
@@ -97,6 +98,12 @@ class Retrieve:
         self.servermap = servermap
         assert self._node._pubkey
         self.verinfo = verinfo
+        # during repair, we may be called upon to grab the private key, since
+        # it wasn't picked up during a verify=False checker run, and we'll
+        # need it for repair to generate the a new version.
+        self._need_privkey = fetch_privkey
+        if self._node._privkey:
+            self._need_privkey = False
 
         self._status = RetrieveStatus()
         self._status.set_storage_index(self._storage_index)
@@ -166,16 +173,24 @@ class Retrieve:
         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
          offsets_tuple) = self.verinfo
         offsets = dict(offsets_tuple)
+
         # we read the checkstring, to make sure that the data we grab is from
-        # the right version. We also read the data, and the hashes necessary
-        # to validate them (share_hash_chain, block_hash_tree, share_data).
-        # We don't read the signature or the pubkey, since that was handled
-        # during the servermap phase, and we'll be comparing the share hash
-        # chain against the roothash that was validated back then.
-        readv = [ (0, struct.calcsize(SIGNED_PREFIX)),
-                  (offsets['share_hash_chain'],
-                   offsets['enc_privkey'] - offsets['share_hash_chain']),
-                  ]
+        # the right version.
+        readv = [ (0, struct.calcsize(SIGNED_PREFIX)) ]
+
+        # We also read the data, and the hashes necessary to validate them
+        # (share_hash_chain, block_hash_tree, share_data). We don't read the
+        # signature or the pubkey, since that was handled during the
+        # servermap phase, and we'll be comparing the share hash chain
+        # against the roothash that was validated back then.
+
+        readv.append( (offsets['share_hash_chain'],
+                       offsets['enc_privkey'] - offsets['share_hash_chain'] ) )
+
+        # if we need the private key (for repair), we also fetch that
+        if self._need_privkey:
+            readv.append( (offsets['enc_privkey'],
+                           offsets['EOF'] - offsets['enc_privkey']) )
 
         m = Marker()
         self._outstanding_queries[m] = (peerid, shnum, started)
@@ -243,7 +258,7 @@ class Retrieve:
         # shares if we get them.. seems better than an assert().
 
         for shnum,datav in datavs.items():
-            (prefix, hash_and_data) = datav
+            (prefix, hash_and_data) = datav[:2]
             try:
                 self._got_results_one_share(shnum, peerid,
                                             prefix, hash_and_data)
@@ -259,6 +274,9 @@ class Retrieve:
                 self._status.problems[peerid] = f
                 self._last_failure = f
                 pass
+            if self._need_privkey and len(datav) > 2:
+                lp = None
+                self._try_to_validate_privkey(datav[2], peerid, shnum, lp)
         # all done!
 
     def _got_results_one_share(self, shnum, peerid,
@@ -296,6 +314,25 @@ class Retrieve:
         # self.shares
         self.shares[shnum] = share_data
 
+    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
+
+        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
+        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
+        if alleged_writekey != self._node.get_writekey():
+            self.log("invalid privkey from %s shnum %d" %
+                     (idlib.nodeid_b2a(peerid)[:8], shnum),
+                     parent=lp, level=log.WEIRD, umid="YIw4tA")
+            return
+
+        # it's good
+        self.log("got valid privkey from shnum %d on peerid %s" %
+                 (shnum, idlib.shortnodeid_b2a(peerid)),
+                 parent=lp)
+        privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
+        self._node._populate_encprivkey(enc_privkey)
+        self._node._populate_privkey(privkey)
+        self._need_privkey = False
+
     def _query_failed(self, f, marker, peerid):
         self.log(format="query to [%(peerid)s] failed",
                  peerid=idlib.shortnodeid_b2a(peerid),
@@ -332,6 +369,15 @@ class Retrieve:
         if len(self.shares) < k:
             # we don't have enough shares yet
             return self._maybe_send_more_queries(k)
+        if self._need_privkey:
+            # we got k shares, but none of them had a valid privkey. TODO:
+            # look further. Adding code to do this is a bit complicated, and
+            # I want to avoid that complication, and this should be pretty
+            # rare (k shares with bitflips in the enc_privkey but not in the
+            # data blocks). If we actually do get here, the subsequent repair
+            # will fail for lack of a privkey.
+            self.log("got k shares but still need_privkey, bummer",
+                     level=log.WEIRD, umid="MdRHPA")
 
         # we have enough to finish. All the shares have had their hashes
         # checked, so if something fails at this point, we don't know how
index 4ec40cc699a3aff05b3e2ad1b766ccdad4e6dc6f..30642c74ec88360979d2c7811d9bcad535bb56a2 100644 (file)
@@ -101,13 +101,14 @@ class ServerMap:
 
     @ivar connections: maps peerid to a RemoteReference
 
-    @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
+    @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
                       shares that I should ignore (because a previous user of
                       the servermap determined that they were invalid). The
                       updater only locates a certain number of shares: if
                       some of these turn out to have integrity problems and
                       are unusable, the caller will need to mark those shares
                       as bad, then re-update the servermap, then try again.
+                      The dict maps (peerid, shnum) tuple to old checkstring.
     """
 
     def __init__(self):
@@ -349,6 +350,9 @@ class ServermapUpdater:
         self._need_privkey = False
         if mode == MODE_WRITE and not self._node._privkey:
             self._need_privkey = True
+        # check+repair: repair requires the privkey, so if we didn't happen
+        # to ask for it during the check, we'll have problems doing the
+        # publish.
 
         prefix = storage.si_b2a(self._storage_index)[:5]
         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
index aa5846d3c460d9d4f4982fda88e67b1f46126f8d..90016cf9e5cae11f00ab811ab36714557bc51696 100644 (file)
@@ -1915,7 +1915,73 @@ class MutableChecker(SystemTestMixin, unittest.TestCase):
             shid_re = (r"Corrupt Shares:\s+%s: block hash tree failure" %
                        self.corrupt_shareid)
             self.failUnless(re.search(shid_re, out), out)
+        d.addCallback(_got_results)
 
+        # now make sure the webapi repairer can fix it
+        def _do_repair(res):
+            url = (self.webish_url +
+                   "uri/%s" % urllib.quote(self.node.get_uri()) +
+                   "?t=check&verify=true&repair=true")
+            return getPage(url, method="POST")
+        d.addCallback(_do_repair)
+        def _got_repair_results(out):
+            self.failUnless("Repair attempted and successful" in out)
+        d.addCallback(_got_repair_results)
+        d.addCallback(_do_check)
+        def _got_postrepair_results(out):
+            self.failIf("Not Healthy!" in out, out)
+            self.failUnless("Recoverable Versions: 10*seq" in out)
+        d.addCallback(_got_postrepair_results)
+
+        return d
+
+    def test_delete_share(self):
+        self.basedir = self.mktemp()
+        d = self.set_up_nodes()
+        CONTENTS = "a little bit of data"
+        d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
+        def _created(node):
+            self.node = node
+            si = self.node.get_storage_index()
+            out = self._run_cli(["debug", "find-shares", base32.b2a(si),
+                                self.clients[1].basedir])
+            files = out.split("\n")
+            # corrupt one of them, using the CLI debug command
+            f = files[0]
+            shnum = os.path.basename(f)
+            nodeid = self.clients[1].nodeid
+            nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
+            self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
+            os.unlink(files[0])
+        d.addCallback(_created)
+        # now make sure the webapi checker notices it
+        def _do_check(res):
+            url = (self.webish_url +
+                   "uri/%s" % urllib.quote(self.node.get_uri()) +
+                   "?t=check&verify=false")
+            return getPage(url, method="POST")
+        d.addCallback(_do_check)
+        def _got_results(out):
+            self.failUnless("Not Healthy!" in out, out)
+            self.failUnless("Unhealthy: best recoverable version has only 9 shares (encoding is 3-of-10)" in out, out)
+            self.failIf("Corrupt Shares" in out, out)
         d.addCallback(_got_results)
+
+        # now make sure the webapi repairer can fix it
+        def _do_repair(res):
+            url = (self.webish_url +
+                   "uri/%s" % urllib.quote(self.node.get_uri()) +
+                   "?t=check&verify=false&repair=true")
+            return getPage(url, method="POST")
+        d.addCallback(_do_repair)
+        def _got_repair_results(out):
+            self.failUnless("Repair attempted and successful" in out)
+        d.addCallback(_got_repair_results)
+        d.addCallback(_do_check)
+        def _got_postrepair_results(out):
+            self.failIf("Not Healthy!" in out, out)
+            self.failUnless("Recoverable Versions: 10*seq" in out)
+        d.addCallback(_got_postrepair_results)
+
         return d