mutable: train checker and repairer to work with MDMF mutable files
authorKevan Carstensen <kevan@isnotajoke.com>
Tue, 2 Aug 2011 01:51:40 +0000 (18:51 -0700)
committerKevan Carstensen <kevan@isnotajoke.com>
Tue, 2 Aug 2011 01:51:40 +0000 (18:51 -0700)
src/allmydata/mutable/checker.py
src/allmydata/mutable/repairer.py

index e5d3c3012fa268147f33830d210309f97c56d548..3063b178559f0359fc228dff78817650ada3b38c 100644 (file)
@@ -1,14 +1,11 @@
 
-from twisted.internet import defer
-from twisted.python import failure
-from allmydata import hashtree
 from allmydata.uri import from_string
-from allmydata.util import hashutil, base32, idlib, log
+from allmydata.util import base32, idlib, log
 from allmydata.check_results import CheckAndRepairResults, CheckResults
 
 from allmydata.mutable.common import MODE_CHECK, CorruptShareError
 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
-from allmydata.mutable.layout import unpack_share, SIGNED_PREFIX_LENGTH
+from allmydata.mutable.retrieve import Retrieve # for verifying
 
 class MutableChecker:
 
@@ -25,6 +22,9 @@ class MutableChecker:
 
     def check(self, verify=False, add_lease=False):
         servermap = ServerMap()
+        # Updating the servermap in MODE_CHECK will stand a good chance
+        # of finding all of the shares, and getting a good idea of
+        # recoverability, etc, without verifying.
         u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
                              servermap, MODE_CHECK, add_lease=add_lease)
         if self._history:
@@ -48,10 +48,14 @@ class MutableChecker:
         if num_recoverable:
             self.best_version = servermap.best_recoverable_version()
 
+        # The file is unhealthy and needs to be repaired if:
+        # - There are unrecoverable versions.
         if servermap.unrecoverable_versions():
             self.need_repair = True
+        # - There isn't a recoverable version.
         if num_recoverable != 1:
             self.need_repair = True
+        # - The best recoverable version is missing some shares.
         if self.best_version:
             available_shares = servermap.shares_available()
             (num_distinct_shares, k, N) = available_shares[self.best_version]
@@ -62,89 +66,42 @@ class MutableChecker:
 
     def _verify_all_shares(self, servermap):
         # read every byte of each share
+        #
+        # This logic is going to be very nearly the same as the
+        # downloader. I bet we could pass the downloader a flag that
+        # makes it do this, and piggyback onto that instead of
+        # duplicating a bunch of code.
+        # 
+        # Like:
+        #  r = Retrieve(blah, blah, blah, verify=True)
+        #  d = r.download()
+        #  (wait, wait, wait, d.callback)
+        #  
+        #  Then, when it has finished, we can check the servermap (which
+        #  we provided to Retrieve) to figure out which shares are bad,
+        #  since the Retrieve process will have updated the servermap as
+        #  it went along.
+        #
+        #  By passing the verify=True flag to the constructor, we are
+        #  telling the downloader a few things.
+        # 
+        #  1. It needs to download all N shares, not just K shares.
+        #  2. It doesn't need to decrypt or decode the shares, only
+        #     verify them.
         if not self.best_version:
             return
-        versionmap = servermap.make_versionmap()
-        shares = versionmap[self.best_version]
-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
-         offsets_tuple) = self.best_version
-        offsets = dict(offsets_tuple)
-        readv = [ (0, offsets["EOF"]) ]
-        dl = []
-        for (shnum, peerid, timestamp) in shares:
-            ss = servermap.connections[peerid]
-            d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
-            d.addCallback(self._got_answer, peerid, servermap)
-            dl.append(d)
-        return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
 
-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
-        # isolate the callRemote to a separate method, so tests can subclass
-        # Publish and override it
-        d = ss.callRemote("slot_readv", storage_index, shnums, readv)
+        r = Retrieve(self._node, servermap, self.best_version, verify=True)
+        d = r.download()
+        d.addCallback(self._process_bad_shares)
         return d
 
-    def _got_answer(self, datavs, peerid, servermap):
-        for shnum,datav in datavs.items():
-            data = datav[0]
-            try:
-                self._got_results_one_share(shnum, peerid, data)
-            except CorruptShareError:
-                f = failure.Failure()
-                self.need_repair = True
-                self.bad_shares.append( (peerid, shnum, f) )
-                prefix = data[:SIGNED_PREFIX_LENGTH]
-                servermap.mark_bad_share(peerid, shnum, prefix)
-                ss = servermap.connections[peerid]
-                self.notify_server_corruption(ss, shnum, str(f.value))
-
-    def check_prefix(self, peerid, shnum, data):
-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
-         offsets_tuple) = self.best_version
-        got_prefix = data[:SIGNED_PREFIX_LENGTH]
-        if got_prefix != prefix:
-            raise CorruptShareError(peerid, shnum,
-                                    "prefix mismatch: share changed while we were reading it")
-
-    def _got_results_one_share(self, shnum, peerid, data):
-        self.check_prefix(peerid, shnum, data)
-
-        # the [seqnum:signature] pieces are validated by _compare_prefix,
-        # which checks their signature against the pubkey known to be
-        # associated with this file.
 
-        (seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature,
-         share_hash_chain, block_hash_tree, share_data,
-         enc_privkey) = unpack_share(data)
-
-        # validate [share_hash_chain,block_hash_tree,share_data]
-
-        leaves = [hashutil.block_hash(share_data)]
-        t = hashtree.HashTree(leaves)
-        if list(t) != block_hash_tree:
-            raise CorruptShareError(peerid, shnum, "block hash tree failure")
-        share_hash_leaf = t[0]
-        t2 = hashtree.IncompleteHashTree(N)
-        # root_hash was checked by the signature
-        t2.set_hashes({0: root_hash})
-        try:
-            t2.set_hashes(hashes=share_hash_chain,
-                          leaves={shnum: share_hash_leaf})
-        except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
-                IndexError), e:
-            msg = "corrupt hashes: %s" % (e,)
-            raise CorruptShareError(peerid, shnum, msg)
-
-        # validate enc_privkey: only possible if we have a write-cap
-        if not self._node.is_readonly():
-            alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
-            alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
-            if alleged_writekey != self._node.get_writekey():
-                raise CorruptShareError(peerid, shnum, "invalid privkey")
+    def _process_bad_shares(self, bad_shares):
+        if bad_shares:
+            self.need_repair = True
+        self.bad_shares = bad_shares
 
-    def notify_server_corruption(self, ss, shnum, reason):
-        ss.callRemoteOnly("advise_corrupt_share",
-                          "mutable", self._storage_index, shnum, reason)
 
     def _count_shares(self, smap, version):
         available_shares = smap.shares_available()
index 5cef5e7dc961cbb102af993408f1860d2950e4dc..d0bfeffeaee1e0096896c34885fa7bd0f2393873 100644 (file)
@@ -2,6 +2,7 @@
 from zope.interface import implements
 from twisted.internet import defer
 from allmydata.interfaces import IRepairResults, ICheckResults
+from allmydata.mutable.publish import MutableData
 
 class RepairResults:
     implements(IRepairResults)
@@ -104,6 +105,8 @@ class Repairer:
             raise RepairRequiresWritecapError("Sorry, repair currently requires a writecap, to set the write-enabler properly.")
 
         d = self.node.download_version(smap, best_version, fetch_privkey=True)
+        d.addCallback(lambda data:
+            MutableData(data))
         d.addCallback(self.node.upload, smap)
         d.addCallback(self.get_results, smap)
         return d