]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/immutable/checker.py
Add comments and a caveat in webapi.rst indicating that
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / checker.py
index 290d8cfdc582ff19a168c97af125bba3d933b6e0..41000f7e0d14df68ae0a5bf0faedefeb017867aa 100644 (file)
-from foolscap import DeadReferenceError
-from allmydata import hashtree
-from allmydata.check_results import CheckerResults
-from allmydata.immutable import download
+from zope.interface import implements
+from twisted.internet import defer
+from foolscap.api import DeadReferenceError, RemoteException
+from allmydata import hashtree, codec, uri
+from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
+from allmydata.hashtree import IncompleteHashTree
+from allmydata.check_results import CheckResults
 from allmydata.uri import CHKFileVerifierURI
 from allmydata.util.assertutil import precondition
-from allmydata.util import base32, deferredutil, log, rrefutil
+from allmydata.util import base32, deferredutil, dictutil, log, mathutil
+from allmydata.util.hashutil import file_renewal_secret_hash, \
+     file_cancel_secret_hash, bucket_renewal_secret_hash, \
+     bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
+     block_hash
 
 from allmydata.immutable import layout
 
+class IntegrityCheckReject(Exception):
+    pass
+class BadURIExtension(IntegrityCheckReject):
+    pass
+class BadURIExtensionHashValue(IntegrityCheckReject):
+    pass
+class BadOrMissingHash(IntegrityCheckReject):
+    pass
+class UnsupportedErasureCodec(BadURIExtension):
+    pass
+
+class ValidatedExtendedURIProxy:
+    implements(IValidatedThingProxy)
+    """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
+    responsible for retrieving and validating the elements from the UEB."""
+
+    def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
+        # fetch_failures is for debugging -- see test_encode.py
+        self._fetch_failures = fetch_failures
+        self._readbucketproxy = readbucketproxy
+        precondition(IVerifierURI.providedBy(verifycap), verifycap)
+        self._verifycap = verifycap
+
+        # required
+        self.segment_size = None
+        self.crypttext_root_hash = None
+        self.share_root_hash = None
+
+        # computed
+        self.block_size = None
+        self.share_size = None
+        self.num_segments = None
+        self.tail_data_size = None
+        self.tail_segment_size = None
+
+        # optional
+        self.crypttext_hash = None
+
+    def __str__(self):
+        return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
+
+    def _check_integrity(self, data):
+        h = uri_extension_hash(data)
+        if h != self._verifycap.uri_extension_hash:
+            msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
+                   (self._readbucketproxy,
+                    base32.b2a(self._verifycap.uri_extension_hash),
+                    base32.b2a(h)))
+            if self._fetch_failures is not None:
+                self._fetch_failures["uri_extension"] += 1
+            raise BadURIExtensionHashValue(msg)
+        else:
+            return data
+
+    def _parse_and_validate(self, data):
+        self.share_size = mathutil.div_ceil(self._verifycap.size,
+                                            self._verifycap.needed_shares)
+
+        d = uri.unpack_extension(data)
+
+        # There are several kinds of things that can be found in a UEB.
+        # First, things that we really need to learn from the UEB in order to
+        # do this download. Next: things which are optional but not redundant
+        # -- if they are present in the UEB they will get used. Next, things
+        # that are optional and redundant. These things are required to be
+        # consistent: they don't have to be in the UEB, but if they are in
+        # the UEB then they will be checked for consistency with the
+        # already-known facts, and if they are inconsistent then an exception
+        # will be raised. These things aren't actually used -- they are just
+        # tested for consistency and ignored. Finally: things which are
+        # deprecated -- they ought not be in the UEB at all, and if they are
+        # present then a warning will be logged but they are otherwise
+        # ignored.
+
+        # First, things that we really need to learn from the UEB:
+        # segment_size, crypttext_root_hash, and share_root_hash.
+        self.segment_size = d['segment_size']
+
+        self.block_size = mathutil.div_ceil(self.segment_size,
+                                            self._verifycap.needed_shares)
+        self.num_segments = mathutil.div_ceil(self._verifycap.size,
+                                              self.segment_size)
+
+        self.tail_data_size = self._verifycap.size % self.segment_size
+        if not self.tail_data_size:
+            self.tail_data_size = self.segment_size
+        # padding for erasure code
+        self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
+                                                        self._verifycap.needed_shares)
+
+        # Ciphertext hash tree root is mandatory, so that there is at most
+        # one ciphertext that matches this read-cap or verify-cap. The
+        # integrity check on the shares is not sufficient to prevent the
+        # original encoder from creating some shares of file A and other
+        # shares of file B.
+        self.crypttext_root_hash = d['crypttext_root_hash']
+
+        self.share_root_hash = d['share_root_hash']
+
+
+        # Next: things that are optional and not redundant: crypttext_hash
+        if d.has_key('crypttext_hash'):
+            self.crypttext_hash = d['crypttext_hash']
+            if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
+                raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
+
+
+        # Next: things that are optional, redundant, and required to be
+        # consistent: codec_name, codec_params, tail_codec_params,
+        # num_segments, size, needed_shares, total_shares
+        if d.has_key('codec_name'):
+            if d['codec_name'] != "crs":
+                raise UnsupportedErasureCodec(d['codec_name'])
+
+        if d.has_key('codec_params'):
+            ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
+            if ucpss != self.segment_size:
+                raise BadURIExtension("inconsistent erasure code params: "
+                                      "ucpss: %s != self.segment_size: %s" %
+                                      (ucpss, self.segment_size))
+            if ucpns != self._verifycap.needed_shares:
+                raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
+                                      "self._verifycap.needed_shares: %s" %
+                                      (ucpns, self._verifycap.needed_shares))
+            if ucpts != self._verifycap.total_shares:
+                raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
+                                      "self._verifycap.total_shares: %s" %
+                                      (ucpts, self._verifycap.total_shares))
+
+        if d.has_key('tail_codec_params'):
+            utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
+            if utcpss != self.tail_segment_size:
+                raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
+                                      "self.tail_segment_size: %s, self._verifycap.size: %s, "
+                                      "self.segment_size: %s, self._verifycap.needed_shares: %s"
+                                      % (utcpss, self.tail_segment_size, self._verifycap.size,
+                                         self.segment_size, self._verifycap.needed_shares))
+            if utcpns != self._verifycap.needed_shares:
+                raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
+                                      "self._verifycap.needed_shares: %s" % (utcpns,
+                                                                             self._verifycap.needed_shares))
+            if utcpts != self._verifycap.total_shares:
+                raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
+                                      "self._verifycap.total_shares: %s" % (utcpts,
+                                                                            self._verifycap.total_shares))
+
+        if d.has_key('num_segments'):
+            if d['num_segments'] != self.num_segments:
+                raise BadURIExtension("inconsistent num_segments: size: %s, "
+                                      "segment_size: %s, computed_num_segments: %s, "
+                                      "ueb_num_segments: %s" % (self._verifycap.size,
+                                                                self.segment_size,
+                                                                self.num_segments, d['num_segments']))
+
+        if d.has_key('size'):
+            if d['size'] != self._verifycap.size:
+                raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
+                                      (self._verifycap.size, d['size']))
+
+        if d.has_key('needed_shares'):
+            if d['needed_shares'] != self._verifycap.needed_shares:
+                raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
+                                      "needed shares: %s" % (self._verifycap.total_shares,
+                                                             d['needed_shares']))
+
+        if d.has_key('total_shares'):
+            if d['total_shares'] != self._verifycap.total_shares:
+                raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
+                                      "total shares: %s" % (self._verifycap.total_shares,
+                                                            d['total_shares']))
+
+        # Finally, things that are deprecated and ignored: plaintext_hash,
+        # plaintext_root_hash
+        if d.get('plaintext_hash'):
+            log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
+                    "and is no longer used.  Ignoring.  %s" % (self,))
+        if d.get('plaintext_root_hash'):
+            log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
+                    "reasons and is no longer used.  Ignoring.  %s" % (self,))
+
+        return self
+
+    def start(self):
+        """Fetch the UEB from bucket, compare its hash to the hash from
+        verifycap, then parse it. Returns a deferred which is called back
+        with self once the fetch is successful, or is erred back if it
+        fails."""
+        d = self._readbucketproxy.get_uri_extension()
+        d.addCallback(self._check_integrity)
+        d.addCallback(self._parse_and_validate)
+        return d
+
+class ValidatedReadBucketProxy(log.PrefixingLogMixin):
+    """I am a front-end for a remote storage bucket, responsible for
+    retrieving and validating data from that bucket.
+
+    My get_block() method is used by BlockDownloaders.
+    """
+
+    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
+                 block_size, share_size):
+        """ share_hash_tree is required to have already been initialized with
+        the root hash (the number-0 hash), using the share_root_hash from the
+        UEB"""
+        precondition(share_hash_tree[0] is not None, share_hash_tree)
+        prefix = "%d-%s-%s" % (sharenum, bucket,
+                               base32.b2a_l(share_hash_tree[0][:8], 60))
+        log.PrefixingLogMixin.__init__(self,
+                                       facility="tahoe.immutable.download",
+                                       prefix=prefix)
+        self.sharenum = sharenum
+        self.bucket = bucket
+        self.share_hash_tree = share_hash_tree
+        self.num_blocks = num_blocks
+        self.block_size = block_size
+        self.share_size = share_size
+        self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
+
+    def get_all_sharehashes(self):
+        """Retrieve and validate all the share-hash-tree nodes that are
+        included in this share, regardless of whether we need them to
+        validate the share or not. Each share contains a minimal Merkle tree
+        chain, but there is lots of overlap, so usually we'll be using hashes
+        from other shares and not reading every single hash from this share.
+        The Verifier uses this function to read and validate every single
+        hash from this share.
+
+        Call this (and wait for the Deferred it returns to fire) before
+        calling get_block() for the first time: this lets us check that the
+        share share contains enough hashes to validate its own data, and
+        avoids downloading any share hash twice.
+
+        I return a Deferred which errbacks upon failure, probably with
+        BadOrMissingHash."""
+
+        d = self.bucket.get_share_hashes()
+        def _got_share_hashes(sh):
+            sharehashes = dict(sh)
+            try:
+                self.share_hash_tree.set_hashes(sharehashes)
+            except IndexError, le:
+                raise BadOrMissingHash(le)
+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+                raise BadOrMissingHash(le)
+        d.addCallback(_got_share_hashes)
+        return d
+
+    def get_all_blockhashes(self):
+        """Retrieve and validate all the block-hash-tree nodes that are
+        included in this share. Each share contains a full Merkle tree, but
+        we usually only fetch the minimal subset necessary for any particular
+        block. This function fetches everything at once. The Verifier uses
+        this function to validate the block hash tree.
+
+        Call this (and wait for the Deferred it returns to fire) after
+        calling get_all_sharehashes() and before calling get_block() for the
+        first time: this lets us check that the share contains all block
+        hashes and avoids downloading them multiple times.
+
+        I return a Deferred which errbacks upon failure, probably with
+        BadOrMissingHash.
+        """
+
+        # get_block_hashes(anything) currently always returns everything
+        needed = list(range(len(self.block_hash_tree)))
+        d = self.bucket.get_block_hashes(needed)
+        def _got_block_hashes(blockhashes):
+            if len(blockhashes) < len(self.block_hash_tree):
+                raise BadOrMissingHash()
+            bh = dict(enumerate(blockhashes))
+
+            try:
+                self.block_hash_tree.set_hashes(bh)
+            except IndexError, le:
+                raise BadOrMissingHash(le)
+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+                raise BadOrMissingHash(le)
+        d.addCallback(_got_block_hashes)
+        return d
+
+    def get_all_crypttext_hashes(self, crypttext_hash_tree):
+        """Retrieve and validate all the crypttext-hash-tree nodes that are
+        in this share. Normally we don't look at these at all: the download
+        process fetches them incrementally as needed to validate each segment
+        of ciphertext. But this is a convenient place to give the Verifier a
+        function to validate all of these at once.
+
+        Call this with a new hashtree object for each share, initialized with
+        the crypttext hash tree root. I return a Deferred which errbacks upon
+        failure, probably with BadOrMissingHash.
+        """
+
+        # get_crypttext_hashes() always returns everything
+        d = self.bucket.get_crypttext_hashes()
+        def _got_crypttext_hashes(hashes):
+            if len(hashes) < len(crypttext_hash_tree):
+                raise BadOrMissingHash()
+            ct_hashes = dict(enumerate(hashes))
+            try:
+                crypttext_hash_tree.set_hashes(ct_hashes)
+            except IndexError, le:
+                raise BadOrMissingHash(le)
+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+                raise BadOrMissingHash(le)
+        d.addCallback(_got_crypttext_hashes)
+        return d
+
+    def get_block(self, blocknum):
+        # the first time we use this bucket, we need to fetch enough elements
+        # of the share hash tree to validate it from our share hash up to the
+        # hashroot.
+        if self.share_hash_tree.needed_hashes(self.sharenum):
+            d1 = self.bucket.get_share_hashes()
+        else:
+            d1 = defer.succeed([])
+
+        # We might need to grab some elements of our block hash tree, to
+        # validate the requested block up to the share hash.
+        blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
+        # We don't need the root of the block hash tree, as that comes in the
+        # share tree.
+        blockhashesneeded.discard(0)
+        d2 = self.bucket.get_block_hashes(blockhashesneeded)
+
+        if blocknum < self.num_blocks-1:
+            thisblocksize = self.block_size
+        else:
+            thisblocksize = self.share_size % self.block_size
+            if thisblocksize == 0:
+                thisblocksize = self.block_size
+        d3 = self.bucket.get_block_data(blocknum,
+                                        self.block_size, thisblocksize)
+
+        dl = deferredutil.gatherResults([d1, d2, d3])
+        dl.addCallback(self._got_data, blocknum)
+        return dl
+
+    def _got_data(self, results, blocknum):
+        precondition(blocknum < self.num_blocks,
+                     self, blocknum, self.num_blocks)
+        sharehashes, blockhashes, blockdata = results
+        try:
+            sharehashes = dict(sharehashes)
+        except ValueError, le:
+            le.args = tuple(le.args + (sharehashes,))
+            raise
+        blockhashes = dict(enumerate(blockhashes))
+
+        candidate_share_hash = None # in case we log it in the except block below
+        blockhash = None # in case we log it in the except block below
+
+        try:
+            if self.share_hash_tree.needed_hashes(self.sharenum):
+                # This will raise exception if the values being passed do not
+                # match the root node of self.share_hash_tree.
+                try:
+                    self.share_hash_tree.set_hashes(sharehashes)
+                except IndexError, le:
+                    # Weird -- sharehashes contained index numbers outside of
+                    # the range that fit into this hash tree.
+                    raise BadOrMissingHash(le)
+
+            # To validate a block we need the root of the block hash tree,
+            # which is also one of the leafs of the share hash tree, and is
+            # called "the share hash".
+            if not self.block_hash_tree[0]: # empty -- no root node yet
+                # Get the share hash from the share hash tree.
+                share_hash = self.share_hash_tree.get_leaf(self.sharenum)
+                if not share_hash:
+                    # No root node in block_hash_tree and also the share hash
+                    # wasn't sent by the server.
+                    raise hashtree.NotEnoughHashesError
+                self.block_hash_tree.set_hashes({0: share_hash})
+
+            if self.block_hash_tree.needed_hashes(blocknum):
+                self.block_hash_tree.set_hashes(blockhashes)
+
+            blockhash = block_hash(blockdata)
+            self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
+            #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
+            #        "%r .. %r: %s" %
+            #        (self.sharenum, blocknum, len(blockdata),
+            #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
+
+        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+            # log.WEIRD: indicates undetected disk/network error, or more
+            # likely a programming error
+            self.log("hash failure in block=%d, shnum=%d on %s" %
+                    (blocknum, self.sharenum, self.bucket))
+            if self.block_hash_tree.needed_hashes(blocknum):
+                self.log(""" failure occurred when checking the block_hash_tree.
+                This suggests that either the block data was bad, or that the
+                block hashes we received along with it were bad.""")
+            else:
+                self.log(""" the failure probably occurred when checking the
+                share_hash_tree, which suggests that the share hashes we
+                received from the remote peer were bad.""")
+            self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
+            self.log(" block length: %d" % len(blockdata))
+            self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
+            if len(blockdata) < 100:
+                self.log(" block data: %r" % (blockdata,))
+            else:
+                self.log(" block data start/end: %r .. %r" %
+                        (blockdata[:50], blockdata[-50:]))
+            self.log(" share hash tree:\n" + self.share_hash_tree.dump())
+            self.log(" block hash tree:\n" + self.block_hash_tree.dump())
+            lines = []
+            for i,h in sorted(sharehashes.items()):
+                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
+            self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
+            lines = []
+            for i,h in blockhashes.items():
+                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
+            log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
+            raise BadOrMissingHash(le)
+
+        # If we made it here, the block is good. If the hash trees didn't
+        # like what they saw, they would have raised a BadHashError, causing
+        # our caller to see a Failure and thus ignore this block (as well as
+        # dropping this bucket).
+        return blockdata
+
+
 class Checker(log.PrefixingLogMixin):
-    """ I query all servers to see if M uniquely-numbered shares are available.
-
-    If the verify flag was passed to my constructor, then for each share I download every data
-    block and all metadata from each server and perform a cryptographic integrity check on all
-    of it.  If not, I just ask each server "Which shares do you have?" and believe its answer.
-
-    In either case, I wait until I have gotten responses from all servers.  This fact -- that I
-    wait -- means that an ill-behaved server which fails to answer my questions will make me
-    wait indefinitely.  If it is ill-behaved in a way that triggers the underlying foolscap
-    timeouts, then I will wait only as long as those foolscap timeouts, but if it is ill-behaved
-    in a way which placates the foolscap timeouts but still doesn't answer my question then I
-    will wait indefinitely.
-
-    Before I send any new request to a server, I always ask the "monitor" object that was passed
-    into my constructor whether this task has been cancelled (by invoking its
-    raise_if_cancelled() method).
+    """I query all servers to see if M uniquely-numbered shares are
+    available.
+
+    If the verify flag was passed to my constructor, then for each share I
+    download every data block and all metadata from each server and perform a
+    cryptographic integrity check on all of it. If not, I just ask each
+    server 'Which shares do you have?' and believe its answer.
+
+    In either case, I wait until I have gotten responses from all servers.
+    This fact -- that I wait -- means that an ill-behaved server which fails
+    to answer my questions will make me wait indefinitely. If it is
+    ill-behaved in a way that triggers the underlying foolscap timeouts, then
+    I will wait only as long as those foolscap timeouts, but if it is
+    ill-behaved in a way which placates the foolscap timeouts but still
+    doesn't answer my question then I will wait indefinitely.
+
+    Before I send any new request to a server, I always ask the 'monitor'
+    object that was passed into my constructor whether this task has been
+    cancelled (by invoking its raise_if_cancelled() method).
     """
-    def __init__(self, client, verifycap, servers, verify, monitor):
+    def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
+                 monitor):
         assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
-        assert precondition(isinstance(servers, (set, frozenset)), servers)
-        for (serverid, serverrref) in servers:
-            assert precondition(isinstance(serverid, str))
-            assert precondition(isinstance(serverrref, rrefutil.WrappedRemoteReference), serverrref)
 
-        prefix = "%s" % base32.b2a_l(verifycap.storage_index[:8], 60)
+        prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
 
-        self._client = client
         self._verifycap = verifycap
 
         self._monitor = monitor
         self._servers = servers
         self._verify = verify # bool: verify what the servers claim, or not?
-
-        self._share_hash_tree = None
-        self._crypttext_hash_tree = None
-
-    def _get_buckets(self, server, storageindex, serverid):
-        """ Return a deferred that eventually fires with ({sharenum: bucket}, serverid,
-        success).  In case the server is disconnected or returns a Failure then it fires with
-        ({}, serverid, False) (A server disconnecting or returning a Failure when we ask it for
-        buckets is the same, for our purposes, as a server that says it has none, except that we
-        want to track and report whether or not each server responded.)"""
-
-        d = server.callRemote("get_buckets", storageindex)
-
+        self._add_lease = add_lease
+
+        frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
+                                       self._verifycap.get_storage_index())
+        self.file_renewal_secret = frs
+        fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
+                                      self._verifycap.get_storage_index())
+        self.file_cancel_secret = fcs
+
+    def _get_renewal_secret(self, seed):
+        return bucket_renewal_secret_hash(self.file_renewal_secret, seed)
+    def _get_cancel_secret(self, seed):
+        return bucket_cancel_secret_hash(self.file_cancel_secret, seed)
+
+    def _get_buckets(self, s, storageindex):
+        """Return a deferred that eventually fires with ({sharenum: bucket},
+        serverid, success). In case the server is disconnected or returns a
+        Failure then it fires with ({}, serverid, False) (A server
+        disconnecting or returning a Failure when we ask it for buckets is
+        the same, for our purposes, as a server that says it has none, except
+        that we want to track and report whether or not each server
+        responded.)"""
+
+        rref = s.get_rref()
+        lease_seed = s.get_lease_seed()
+        if self._add_lease:
+            renew_secret = self._get_renewal_secret(lease_seed)
+            cancel_secret = self._get_cancel_secret(lease_seed)
+            d2 = rref.callRemote("add_lease", storageindex,
+                                 renew_secret, cancel_secret)
+            d2.addErrback(self._add_lease_failed, s.get_name(), storageindex)
+
+        d = rref.callRemote("get_buckets", storageindex)
         def _wrap_results(res):
-            for k in res:
-                res[k] = rrefutil.WrappedRemoteReference(res[k])
-            return (res, serverid, True)
+            return (res, True)
 
         def _trap_errs(f):
             level = log.WEIRD
             if f.check(DeadReferenceError):
                 level = log.UNUSUAL
-            self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ")
-            return ({}, serverid, False)
+            self.log("failure from server on 'get_buckets' the REMOTE failure was:",
+                     facility="tahoe.immutable.checker",
+                     failure=f, level=level, umid="AX7wZQ")
+            return ({}, False)
 
         d.addCallbacks(_wrap_results, _trap_errs)
         return d
 
-    def _download_and_verify(self, serverid, sharenum, bucket):
-        """ Start an attempt to download and verify every block in this bucket and return a
-        deferred that will eventually fire once the attempt completes.
+    def _add_lease_failed(self, f, server_name, storage_index):
+        # Older versions of Tahoe didn't handle the add-lease message very
+        # well: <=1.1.0 throws a NameError because it doesn't implement
+        # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
+        # (which is most of them, since we send add-lease to everybody,
+        # before we know whether or not they have any shares for us), and
+        # 1.2.0 throws KeyError even on known buckets due to an internal bug
+        # in the latency-measuring code.
+
+        # we want to ignore the known-harmless errors and log the others. In
+        # particular we want to log any local errors caused by coding
+        # problems.
+
+        if f.check(DeadReferenceError):
+            return
+        if f.check(RemoteException):
+            if f.value.failure.check(KeyError, IndexError, NameError):
+                # this may ignore a bit too much, but that only hurts us
+                # during debugging
+                return
+            self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
+                     name=server_name,
+                     f_value=str(f.value),
+                     failure=f,
+                     level=log.WEIRD, umid="atbAxw")
+            return
+        # local errors are cause for alarm
+        log.err(f,
+                format="local error in add_lease to [%(name)s]: %(f_value)s",
+                name=server_name,
+                f_value=str(f.value),
+                level=log.WEIRD, umid="hEGuQg")
+
+
+    def _download_and_verify(self, server, sharenum, bucket):
+        """Start an attempt to download and verify every block in this bucket
+        and return a deferred that will eventually fire once the attempt
+        completes.
+
+        If you download and verify every block then fire with (True,
+        sharenum, None), else if the share data couldn't be parsed because it
+        was of an unknown version number fire with (False, sharenum,
+        'incompatible'), else if any of the blocks were invalid, fire with
+        (False, sharenum, 'corrupt'), else if the server disconnected (False,
+        sharenum, 'disconnect'), else if the server returned a Failure during
+        the process fire with (False, sharenum, 'failure').
+
+        If there is an internal error such as an uncaught exception in this
+        code, then the deferred will errback, but if there is a remote error
+        such as the server failing or the returned data being incorrect then
+        it will not errback -- it will fire normally with the indicated
+        results."""
+
+        vcap = self._verifycap
+        b = layout.ReadBucketProxy(bucket, server, vcap.get_storage_index())
+        veup = ValidatedExtendedURIProxy(b, vcap)
+        d = veup.start()
+
+        def _got_ueb(vup):
+            share_hash_tree = IncompleteHashTree(vcap.total_shares)
+            share_hash_tree.set_hashes({0: vup.share_root_hash})
+
+            vrbp = ValidatedReadBucketProxy(sharenum, b,
+                                            share_hash_tree,
+                                            vup.num_segments,
+                                            vup.block_size,
+                                            vup.share_size)
+
+            # note: normal download doesn't use get_all_sharehashes(),
+            # because it gets more data than necessary. We've discussed the
+            # security properties of having verification and download look
+            # identical (so the server couldn't, say, provide good responses
+            # for one and not the other), but I think that full verification
+            # is more important than defending against inconsistent server
+            # behavior. Besides, they can't pass the verifier without storing
+            # all the data, so there's not so much to be gained by behaving
+            # inconsistently.
+            d = vrbp.get_all_sharehashes()
+            # we fill share_hash_tree before fetching any blocks, so the
+            # block fetches won't send redundant share-hash-tree requests, to
+            # speed things up. Then we fetch+validate all the blockhashes.
+            d.addCallback(lambda ign: vrbp.get_all_blockhashes())
+
+            cht = IncompleteHashTree(vup.num_segments)
+            cht.set_hashes({0: vup.crypttext_root_hash})
+            d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht))
+
+            d.addCallback(lambda ign: vrbp)
+            return d
+        d.addCallback(_got_ueb)
 
-        If you download and verify every block then fire with (True, sharenum, None), else if
-        the share data couldn't be parsed because it was of an unknown version number fire with
-        (False, sharenum, 'incompatible'), else if any of the blocks were invalid, fire with
-        (False, sharenum, 'corrupt'), else if the server disconnected (False, sharenum,
-        'disconnect'), else if the server returned a Failure during the process fire with
-        (False, sharenum, 'failure').
+        def _discard_result(r):
+            assert isinstance(r, str), r
+            # to free up the RAM
+            return None
 
-        If there is an internal error such as an uncaught exception in this code, then the
-        deferred will errback, but if there is a remote error such as the server failing or the
-        returned data being incorrect then it will not errback -- it will fire normally with the
-        indicated results. """
+        def _get_blocks(vrbp):
+            def _get_block(ign, blocknum):
+                db = vrbp.get_block(blocknum)
+                db.addCallback(_discard_result)
+                return db
 
-        b = layout.ReadBucketProxy(bucket, serverid, self._verifycap.storage_index)
-        veup = download.ValidatedExtendedURIProxy(b, self._verifycap)
-        d = veup.start()
+            dbs = defer.succeed(None)
+            for blocknum in range(veup.num_segments):
+                dbs.addCallback(_get_block, blocknum)
+
+            # The Deferred we return will fire after every block of this
+            # share has been downloaded and verified successfully, or else it
+            # will errback as soon as the first error is observed.
+            return dbs
 
+        d.addCallback(_get_blocks)
+
+        # if none of those errbacked, the blocks (and the hashes above them)
+        # are good
+        def _all_good(ign):
+            return (True, sharenum, None)
+        d.addCallback(_all_good)
+
+        # but if anything fails, we'll land here
         def _errb(f):
-            # Okay, we didn't succeed at fetching and verifying all the blocks of this
-            # share.  Now we need to handle different reasons for failure differently.  If
-            # the failure isn't one of the following four classes then it will get
-            # re-raised.
-            failtype = f.trap(DeadReferenceError, rrefutil.ServerFailure, layout.LayoutInvalid, layout.RidiculouslyLargeURIExtensionBlock, download.BadOrMissingHash, download.BadURIExtensionHashValue)
+            # We didn't succeed at fetching and verifying all the blocks of
+            # this share. Handle each reason for failure differently.
 
-            if failtype is DeadReferenceError:
+            if f.check(DeadReferenceError):
                 return (False, sharenum, 'disconnect')
-            elif failtype is rrefutil.ServerFailure:
+            elif f.check(RemoteException):
                 return (False, sharenum, 'failure')
-            elif failtype is layout.ShareVersionIncompatible:
+            elif f.check(layout.ShareVersionIncompatible):
                 return (False, sharenum, 'incompatible')
-            else:
+            elif f.check(layout.LayoutInvalid,
+                         layout.RidiculouslyLargeURIExtensionBlock,
+                         BadOrMissingHash,
+                         BadURIExtensionHashValue):
                 return (False, sharenum, 'corrupt')
 
-        def _got_ueb(vup):
-            self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
-            self._share_hash_tree.set_hashes({0: vup.share_root_hash})
-
-            vrbp = download.ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, vup.num_segments, vup.block_size, vup.share_size)
-
-            ds = []
-            for blocknum in range(vup.num_segments):
-                def _discard_result(r):
-                    assert isinstance(r, str), r
-                    # to free up the RAM
-                    return None
-                d2 = vrbp.get_block(blocknum)
-                d2.addCallback(_discard_result)
-                ds.append(d2)
-
-            dl = deferredutil.gatherResults(ds)
-            # dl will fire once every block of this share has been downloaded and verified, or else it will errback.
-
-            def _cb(result):
-                return (True, sharenum, None)
-
-            dl.addCallback(_cb)
-            return dl
-
-        d.addCallback(_got_ueb)
+            # if it wasn't one of those reasons, re-raise the error
+            return f
         d.addErrback(_errb)
 
         return d
 
-    def _verify_server_shares(self, serverid, ss):
-        """ Return a deferred which eventually fires with a tuple of (set(sharenum), serverid,
-        set(corruptsharenum), set(incompatiblesharenum), success) showing all the shares
-        verified to be served by this server, and all the corrupt shares served by the server,
-        and all the incompatible shares served by the server.  In case the server is
-        disconnected or returns a Failure then it fires with the last element False.  A server
-        disconnecting or returning a failure when we ask it for shares is the same, for our
-        purposes, as a server that says it has none or offers invalid ones, except that we want
-        to track and report the server's behavior.  Similarly, the presence of corrupt shares is
-        mainly of use for diagnostics -- you can typically treat it as just like being no share
-        at all by just observing its absence from the verified shares dict and ignoring its
-        presence in the corrupt shares dict.  The 'success' argument means whether the server
-        responded to *any* queries during this process, so if it responded to some queries and
-        then disconnected and ceased responding, or returned a failure, it is still marked with
-        the True flag for 'success'.
+    def _verify_server_shares(self, s):
+        """ Return a deferred which eventually fires with a tuple of
+        (set(sharenum), server, set(corruptsharenum),
+        set(incompatiblesharenum), success) showing all the shares verified
+        to be served by this server, and all the corrupt shares served by the
+        server, and all the incompatible shares served by the server. In case
+        the server is disconnected or returns a Failure then it fires with
+        the last element False.
+
+        A server disconnecting or returning a failure when we ask it for
+        shares is the same, for our purposes, as a server that says it has
+        none or offers invalid ones, except that we want to track and report
+        the server's behavior. Similarly, the presence of corrupt shares is
+        mainly of use for diagnostics -- you can typically treat it as just
+        like being no share at all by just observing its absence from the
+        verified shares dict and ignoring its presence in the corrupt shares
+        dict.
+
+        The 'success' argument means whether the server responded to *any*
+        queries during this process, so if it responded to some queries and
+        then disconnected and ceased responding, or returned a failure, it is
+        still marked with the True flag for 'success'.
         """
-        d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
+        d = self._get_buckets(s, self._verifycap.get_storage_index())
 
         def _got_buckets(result):
-            bucketdict, serverid, success = result
+            bucketdict, success = result
 
             shareverds = []
             for (sharenum, bucket) in bucketdict.items():
-                d = self._download_and_verify(serverid, sharenum, bucket)
+                d = self._download_and_verify(s, sharenum, bucket)
                 shareverds.append(d)
 
             dl = deferredutil.gatherResults(shareverds)
@@ -176,93 +709,108 @@ class Checker(log.PrefixingLogMixin):
                             corrupt.add(sharenum)
                         elif whynot == 'incompatible':
                             incompatible.add(sharenum)
-                return (verified, serverid, corrupt, incompatible, success)
+                return (verified, s, corrupt, incompatible, success)
 
             dl.addCallback(collect)
             return dl
 
         def _err(f):
-            f.trap(rrefutil.ServerFailure)
-            return (set(), serverid, set(), set(), False)
+            f.trap(RemoteException, DeadReferenceError)
+            return (set(), s, set(), set(), False)
 
         d.addCallbacks(_got_buckets, _err)
         return d
 
-    def _check_server_shares(self, serverid, ss):
-        """ Return a deferred which eventually fires with a tuple of (set(sharenum), serverid,
-        set(), set(), responded) showing all the shares claimed to be served by this server.  In
-        case the server is disconnected then it fires with (set() serverid, set(), set(), False)
-        (a server disconnecting when we ask it for buckets is the same, for our purposes, as a
-        server that says it has none, except that we want to track and report whether or not
-        each server responded.)"""
+    def _check_server_shares(self, s):
+        """Return a deferred which eventually fires with a tuple of
+        (set(sharenum), server, set(), set(), responded) showing all the
+        shares claimed to be served by this server. In case the server is
+        disconnected then it fires with (set(), server, set(), set(), False)
+        (a server disconnecting when we ask it for buckets is the same, for
+        our purposes, as a server that says it has none, except that we want
+        to track and report whether or not each server responded.)"""
         def _curry_empty_corrupted(res):
-            buckets, serverid, responded = res
-            return (set(buckets), serverid, set(), set(), responded)
-        d = self._get_buckets(ss, self._verifycap.storage_index, serverid)
+            buckets, responded = res
+            return (set(buckets), s, set(), set(), responded)
+        d = self._get_buckets(s, self._verifycap.get_storage_index())
         d.addCallback(_curry_empty_corrupted)
         return d
 
     def _format_results(self, results):
-        cr = CheckerResults(self._verifycap, self._verifycap.storage_index)
-        d = {}
-        d['count-shares-needed'] = self._verifycap.needed_shares
-        d['count-shares-expected'] = self._verifycap.total_shares
-
-        verifiedshares = {} # {sharenum: set(serverid)}
-        servers = {} # {serverid: set(sharenums)}
-        corruptsharelocators = [] # (serverid, storageindex, sharenum)
-        incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
-
-        for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results:
-            servers.setdefault(thisserverid, set()).update(theseverifiedshares)
-            for sharenum in theseverifiedshares:
-                verifiedshares.setdefault(sharenum, set()).add(thisserverid)
-            for sharenum in thesecorruptshares:
-                corruptsharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
-            for sharenum in theseincompatibleshares:
-                incompatiblesharelocators.append((thisserverid, self._verifycap.storage_index, sharenum))
-
-        d['count-shares-good'] = len(verifiedshares)
-        d['count-good-share-hosts'] = len([s for s in servers.keys() if servers[s]])
+        SI = self._verifycap.get_storage_index()
+
+        verifiedshares = dictutil.DictOfSets() # {sharenum: set(server)}
+        servers = {} # {server: set(sharenums)}
+        corruptshare_locators = [] # (server, storageindex, sharenum)
+        incompatibleshare_locators = [] # (server, storageindex, sharenum)
+        servers_responding = set() # server
+
+        for verified, server, corrupt, incompatible, responded in results:
+            servers.setdefault(server, set()).update(verified)
+            for sharenum in verified:
+                verifiedshares.setdefault(sharenum, set()).add(server)
+            for sharenum in corrupt:
+                corruptshare_locators.append((server, SI, sharenum))
+            for sharenum in incompatible:
+                incompatibleshare_locators.append((server, SI, sharenum))
+            if responded:
+                servers_responding.add(server)
+
+        good_share_hosts = len([s for s in servers.keys() if servers[s]])
 
         assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
         if len(verifiedshares) == self._verifycap.total_shares:
-            cr.set_healthy(True)
+            healthy = True
+            summary = "Healthy"
         else:
-            cr.set_healthy(False)
+            healthy = False
+            summary = ("Not Healthy: %d shares (enc %d-of-%d)" %
+                       (len(verifiedshares),
+                        self._verifycap.needed_shares,
+                        self._verifycap.total_shares))
         if len(verifiedshares) >= self._verifycap.needed_shares:
-            cr.set_recoverable(True)
-            d['count-recoverable-versions'] = 1
-            d['count-unrecoverable-versions'] = 0
+            recoverable = 1
+            unrecoverable = 0
         else:
-            cr.set_recoverable(False)
-            d['count-recoverable-versions'] = 0
-            d['count-unrecoverable-versions'] = 1
-
-        d['servers-responding'] = list(servers)
-        d['sharemap'] = verifiedshares
-        d['count-wrong-shares'] = 0 # no such thing as wrong shares of an immutable file
-        d['list-corrupt-shares'] = corruptsharelocators
-        d['count-corrupt-shares'] = len(corruptsharelocators)
-        d['list-incompatible-shares'] = incompatiblesharelocators
-        d['count-incompatible-shares'] = len(incompatiblesharelocators)
-
-
-        # The file needs rebalancing if the set of servers that have at least one share is less
-        # than the number of uniquely-numbered shares available.
-        cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good'])
-
-        cr.set_data(d)
+            recoverable = 0
+            unrecoverable = 1
+
+        # The file needs rebalancing if the set of servers that have at least
+        # one share is less than the number of uniquely-numbered shares
+        # available.
+        # TODO: this may be wrong, see ticket #1115 comment:27 and ticket #1784.
+        needs_rebalancing = bool(good_share_hosts < len(verifiedshares))
+
+        cr = CheckResults(self._verifycap, SI,
+                          healthy=healthy, recoverable=bool(recoverable),
+                          needs_rebalancing=needs_rebalancing,
+                          count_shares_needed=self._verifycap.needed_shares,
+                          count_shares_expected=self._verifycap.total_shares,
+                          count_shares_good=len(verifiedshares),
+                          count_good_share_hosts=good_share_hosts,
+                          count_recoverable_versions=recoverable,
+                          count_unrecoverable_versions=unrecoverable,
+                          servers_responding=list(servers_responding),
+                          sharemap=verifiedshares,
+                          count_wrong_shares=0, # no such thing, for immutable
+                          list_corrupt_shares=corruptshare_locators,
+                          count_corrupt_shares=len(corruptshare_locators),
+                          list_incompatible_shares=incompatibleshare_locators,
+                          count_incompatible_shares=len(incompatibleshare_locators),
+                          summary=summary,
+                          report=[],
+                          share_problems=[],
+                          servermap=None)
 
         return cr
 
     def start(self):
         ds = []
         if self._verify:
-            for (serverid, ss) in self._servers:
-                ds.append(self._verify_server_shares(serverid, ss))
+            for s in self._servers:
+                ds.append(self._verify_server_shares(s))
         else:
-            for (serverid, ss) in self._servers:
-                ds.append(self._check_server_shares(serverid, ss))
+            for s in self._servers:
+                ds.append(self._check_server_shares(s))
 
         return deferredutil.gatherResults(ds).addCallback(self._format_results)