-
-"""
-Given a StorageIndex, count how many shares we can find.
-
-This does no verification of the shares whatsoever. If the peer claims to
-have the share, we believe them.
-"""
-
+from zope.interface import implements
from twisted.internet import defer
-from twisted.python import log
-from allmydata import storage
-from allmydata.checker_results import CheckerResults
-from allmydata.immutable import download
-from allmydata.uri import CHKFileURI
-from allmydata.util import hashutil
+from foolscap.api import DeadReferenceError, RemoteException
+from allmydata import hashtree, codec, uri
+from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
+from allmydata.hashtree import IncompleteHashTree
+from allmydata.check_results import CheckResults
+from allmydata.uri import CHKFileVerifierURI
from allmydata.util.assertutil import precondition
-
-class SimpleCHKFileChecker:
- """Return a list of (needed, total, found, sharemap), where sharemap maps
- share number to a list of (binary) nodeids of the shareholders."""
-
- def __init__(self, client, uri, storage_index, needed_shares, total_shares):
- self.peer_getter = client.get_permuted_peers
- self.needed_shares = needed_shares
- self.total_shares = total_shares
- self.found_shares = set()
- self.uri = uri
- self.storage_index = storage_index
- self.sharemap = {}
- self.responded = set()
-
- '''
- def check_synchronously(self, si):
- # this is how we would write this class if we were using synchronous
- # messages (or if we used promises).
- found = set()
- for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
- buckets = connection.get_buckets(si)
- found.update(buckets.keys())
- return len(found)
- '''
+from allmydata.util import base32, deferredutil, dictutil, log, mathutil
+from allmydata.util.hashutil import file_renewal_secret_hash, \
+ file_cancel_secret_hash, bucket_renewal_secret_hash, \
+ bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
+ block_hash
+
+from allmydata.immutable import layout
+
+class IntegrityCheckReject(Exception):
+ pass
+class BadURIExtension(IntegrityCheckReject):
+ pass
+class BadURIExtensionHashValue(IntegrityCheckReject):
+ pass
+class BadOrMissingHash(IntegrityCheckReject):
+ pass
+class UnsupportedErasureCodec(BadURIExtension):
+ pass
+
+class ValidatedExtendedURIProxy:
+ implements(IValidatedThingProxy)
+ """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
+ responsible for retrieving and validating the elements from the UEB."""
+
+ def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
+ # fetch_failures is for debugging -- see test_encode.py
+ self._fetch_failures = fetch_failures
+ self._readbucketproxy = readbucketproxy
+ precondition(IVerifierURI.providedBy(verifycap), verifycap)
+ self._verifycap = verifycap
+
+ # required
+ self.segment_size = None
+ self.crypttext_root_hash = None
+ self.share_root_hash = None
+
+ # computed
+ self.block_size = None
+ self.share_size = None
+ self.num_segments = None
+ self.tail_data_size = None
+ self.tail_segment_size = None
+
+ # optional
+ self.crypttext_hash = None
+
+ def __str__(self):
+ return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
+
+ def _check_integrity(self, data):
+ h = uri_extension_hash(data)
+ if h != self._verifycap.uri_extension_hash:
+ msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
+ (self._readbucketproxy,
+ base32.b2a(self._verifycap.uri_extension_hash),
+ base32.b2a(h)))
+ if self._fetch_failures is not None:
+ self._fetch_failures["uri_extension"] += 1
+ raise BadURIExtensionHashValue(msg)
+ else:
+ return data
+
+ def _parse_and_validate(self, data):
+ self.share_size = mathutil.div_ceil(self._verifycap.size,
+ self._verifycap.needed_shares)
+
+ d = uri.unpack_extension(data)
+
+ # There are several kinds of things that can be found in a UEB.
+ # First, things that we really need to learn from the UEB in order to
+ # do this download. Next: things which are optional but not redundant
+ # -- if they are present in the UEB they will get used. Next, things
+ # that are optional and redundant. These things are required to be
+ # consistent: they don't have to be in the UEB, but if they are in
+ # the UEB then they will be checked for consistency with the
+ # already-known facts, and if they are inconsistent then an exception
+ # will be raised. These things aren't actually used -- they are just
+ # tested for consistency and ignored. Finally: things which are
+ # deprecated -- they ought not be in the UEB at all, and if they are
+ # present then a warning will be logged but they are otherwise
+ # ignored.
+
+ # First, things that we really need to learn from the UEB:
+ # segment_size, crypttext_root_hash, and share_root_hash.
+ self.segment_size = d['segment_size']
+
+ self.block_size = mathutil.div_ceil(self.segment_size,
+ self._verifycap.needed_shares)
+ self.num_segments = mathutil.div_ceil(self._verifycap.size,
+ self.segment_size)
+
+ self.tail_data_size = self._verifycap.size % self.segment_size
+ if not self.tail_data_size:
+ self.tail_data_size = self.segment_size
+ # padding for erasure code
+ self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
+ self._verifycap.needed_shares)
+
+ # Ciphertext hash tree root is mandatory, so that there is at most
+ # one ciphertext that matches this read-cap or verify-cap. The
+ # integrity check on the shares is not sufficient to prevent the
+ # original encoder from creating some shares of file A and other
+ # shares of file B.
+ self.crypttext_root_hash = d['crypttext_root_hash']
+
+ self.share_root_hash = d['share_root_hash']
+
+
+ # Next: things that are optional and not redundant: crypttext_hash
+ if d.has_key('crypttext_hash'):
+ self.crypttext_hash = d['crypttext_hash']
+ if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
+ raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
+
+
+ # Next: things that are optional, redundant, and required to be
+ # consistent: codec_name, codec_params, tail_codec_params,
+ # num_segments, size, needed_shares, total_shares
+ if d.has_key('codec_name'):
+ if d['codec_name'] != "crs":
+ raise UnsupportedErasureCodec(d['codec_name'])
+
+ if d.has_key('codec_params'):
+ ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
+ if ucpss != self.segment_size:
+ raise BadURIExtension("inconsistent erasure code params: "
+ "ucpss: %s != self.segment_size: %s" %
+ (ucpss, self.segment_size))
+ if ucpns != self._verifycap.needed_shares:
+ raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
+ "self._verifycap.needed_shares: %s" %
+ (ucpns, self._verifycap.needed_shares))
+ if ucpts != self._verifycap.total_shares:
+ raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
+ "self._verifycap.total_shares: %s" %
+ (ucpts, self._verifycap.total_shares))
+
+ if d.has_key('tail_codec_params'):
+ utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
+ if utcpss != self.tail_segment_size:
+ raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
+ "self.tail_segment_size: %s, self._verifycap.size: %s, "
+ "self.segment_size: %s, self._verifycap.needed_shares: %s"
+ % (utcpss, self.tail_segment_size, self._verifycap.size,
+ self.segment_size, self._verifycap.needed_shares))
+ if utcpns != self._verifycap.needed_shares:
+ raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
+ "self._verifycap.needed_shares: %s" % (utcpns,
+ self._verifycap.needed_shares))
+ if utcpts != self._verifycap.total_shares:
+ raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
+ "self._verifycap.total_shares: %s" % (utcpts,
+ self._verifycap.total_shares))
+
+ if d.has_key('num_segments'):
+ if d['num_segments'] != self.num_segments:
+ raise BadURIExtension("inconsistent num_segments: size: %s, "
+ "segment_size: %s, computed_num_segments: %s, "
+ "ueb_num_segments: %s" % (self._verifycap.size,
+ self.segment_size,
+ self.num_segments, d['num_segments']))
+
+ if d.has_key('size'):
+ if d['size'] != self._verifycap.size:
+ raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
+ (self._verifycap.size, d['size']))
+
+ if d.has_key('needed_shares'):
+ if d['needed_shares'] != self._verifycap.needed_shares:
+ raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
+ "needed shares: %s" % (self._verifycap.total_shares,
+ d['needed_shares']))
+
+ if d.has_key('total_shares'):
+ if d['total_shares'] != self._verifycap.total_shares:
+ raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
+ "total shares: %s" % (self._verifycap.total_shares,
+ d['total_shares']))
+
+ # Finally, things that are deprecated and ignored: plaintext_hash,
+ # plaintext_root_hash
+ if d.get('plaintext_hash'):
+ log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
+ "and is no longer used. Ignoring. %s" % (self,))
+ if d.get('plaintext_root_hash'):
+ log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
+ "reasons and is no longer used. Ignoring. %s" % (self,))
+
+ return self
def start(self):
- d = self._get_all_shareholders(self.storage_index)
- d.addCallback(self._done)
+ """Fetch the UEB from bucket, compare its hash to the hash from
+ verifycap, then parse it. Returns a deferred which is called back
+ with self once the fetch is successful, or is erred back if it
+ fails."""
+ d = self._readbucketproxy.get_uri_extension()
+ d.addCallback(self._check_integrity)
+ d.addCallback(self._parse_and_validate)
+ return d
+
+class ValidatedReadBucketProxy(log.PrefixingLogMixin):
+ """I am a front-end for a remote storage bucket, responsible for
+ retrieving and validating data from that bucket.
+
+ My get_block() method is used by BlockDownloaders.
+ """
+
+ def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
+ block_size, share_size):
+ """ share_hash_tree is required to have already been initialized with
+ the root hash (the number-0 hash), using the share_root_hash from the
+ UEB"""
+ precondition(share_hash_tree[0] is not None, share_hash_tree)
+ prefix = "%d-%s-%s" % (sharenum, bucket,
+ base32.b2a_l(share_hash_tree[0][:8], 60))
+ log.PrefixingLogMixin.__init__(self,
+ facility="tahoe.immutable.download",
+ prefix=prefix)
+ self.sharenum = sharenum
+ self.bucket = bucket
+ self.share_hash_tree = share_hash_tree
+ self.num_blocks = num_blocks
+ self.block_size = block_size
+ self.share_size = share_size
+ self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
+
+ def get_all_sharehashes(self):
+ """Retrieve and validate all the share-hash-tree nodes that are
+ included in this share, regardless of whether we need them to
+ validate the share or not. Each share contains a minimal Merkle tree
+ chain, but there is lots of overlap, so usually we'll be using hashes
+ from other shares and not reading every single hash from this share.
+ The Verifier uses this function to read and validate every single
+ hash from this share.
+
+ Call this (and wait for the Deferred it returns to fire) before
+ calling get_block() for the first time: this lets us check that the
+ share share contains enough hashes to validate its own data, and
+ avoids downloading any share hash twice.
+
+ I return a Deferred which errbacks upon failure, probably with
+ BadOrMissingHash."""
+
+ d = self.bucket.get_share_hashes()
+ def _got_share_hashes(sh):
+ sharehashes = dict(sh)
+ try:
+ self.share_hash_tree.set_hashes(sharehashes)
+ except IndexError, le:
+ raise BadOrMissingHash(le)
+ except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+ raise BadOrMissingHash(le)
+ d.addCallback(_got_share_hashes)
+ return d
+
+ def get_all_blockhashes(self):
+ """Retrieve and validate all the block-hash-tree nodes that are
+ included in this share. Each share contains a full Merkle tree, but
+ we usually only fetch the minimal subset necessary for any particular
+ block. This function fetches everything at once. The Verifier uses
+ this function to validate the block hash tree.
+
+ Call this (and wait for the Deferred it returns to fire) after
+ calling get_all_sharehashes() and before calling get_block() for the
+ first time: this lets us check that the share contains all block
+ hashes and avoids downloading them multiple times.
+
+ I return a Deferred which errbacks upon failure, probably with
+ BadOrMissingHash.
+ """
+
+ # get_block_hashes(anything) currently always returns everything
+ needed = list(range(len(self.block_hash_tree)))
+ d = self.bucket.get_block_hashes(needed)
+ def _got_block_hashes(blockhashes):
+ if len(blockhashes) < len(self.block_hash_tree):
+ raise BadOrMissingHash()
+ bh = dict(enumerate(blockhashes))
+
+ try:
+ self.block_hash_tree.set_hashes(bh)
+ except IndexError, le:
+ raise BadOrMissingHash(le)
+ except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+ raise BadOrMissingHash(le)
+ d.addCallback(_got_block_hashes)
return d
- def _get_all_shareholders(self, storage_index):
- dl = []
- for (peerid, ss) in self.peer_getter("storage", storage_index):
- d = ss.callRemote("get_buckets", storage_index)
- d.addCallbacks(self._got_response, self._got_error,
- callbackArgs=(peerid,))
- dl.append(d)
- return defer.DeferredList(dl)
-
- def _got_response(self, buckets, peerid):
- # buckets is a dict: maps shum to an rref of the server who holds it
- self.found_shares.update(buckets.keys())
- for k in buckets:
- if k not in self.sharemap:
- self.sharemap[k] = []
- self.sharemap[k].append(peerid)
- self.responded.add(peerid)
-
- def _got_error(self, f):
- if f.check(KeyError):
- pass
- log.err(f)
- pass
-
- def _done(self, res):
- r = CheckerResults(self.uri.to_string(), self.storage_index)
- report = []
- healthy = bool(len(self.found_shares) >= self.total_shares)
- r.set_healthy(healthy)
- recoverable = bool(len(self.found_shares) >= self.needed_shares)
- r.set_recoverable(recoverable)
- data = {"count-shares-good": len(self.found_shares),
- "count-shares-needed": self.needed_shares,
- "count-shares-expected": self.total_shares,
- "count-wrong-shares": 0,
- }
- if recoverable:
- data["count-recoverable-versions"] = 1
- data["count-unrecoverable-versions"] = 0
+ def get_all_crypttext_hashes(self, crypttext_hash_tree):
+ """Retrieve and validate all the crypttext-hash-tree nodes that are
+ in this share. Normally we don't look at these at all: the download
+ process fetches them incrementally as needed to validate each segment
+ of ciphertext. But this is a convenient place to give the Verifier a
+ function to validate all of these at once.
+
+ Call this with a new hashtree object for each share, initialized with
+ the crypttext hash tree root. I return a Deferred which errbacks upon
+ failure, probably with BadOrMissingHash.
+ """
+
+ # get_crypttext_hashes() always returns everything
+ d = self.bucket.get_crypttext_hashes()
+ def _got_crypttext_hashes(hashes):
+ if len(hashes) < len(crypttext_hash_tree):
+ raise BadOrMissingHash()
+ ct_hashes = dict(enumerate(hashes))
+ try:
+ crypttext_hash_tree.set_hashes(ct_hashes)
+ except IndexError, le:
+ raise BadOrMissingHash(le)
+ except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+ raise BadOrMissingHash(le)
+ d.addCallback(_got_crypttext_hashes)
+ return d
+
+ def get_block(self, blocknum):
+ # the first time we use this bucket, we need to fetch enough elements
+ # of the share hash tree to validate it from our share hash up to the
+ # hashroot.
+ if self.share_hash_tree.needed_hashes(self.sharenum):
+ d1 = self.bucket.get_share_hashes()
else:
- data["count-recoverable-versions"] = 0
- data["count-unrecoverable-versions"] = 1
-
- data["count-corrupt-shares"] = 0 # non-verifier doesn't see corruption
- data["list-corrupt-shares"] = []
- hosts = set()
- sharemap = {}
- for (shnum,nodeids) in self.sharemap.items():
- hosts.update(nodeids)
- sharemap[shnum] = nodeids
- data["count-good-share-hosts"] = len(hosts)
- data["servers-responding"] = list(self.responded)
- data["sharemap"] = sharemap
-
- r.set_data(data)
- r.set_needs_rebalancing(bool( len(self.found_shares) > len(hosts) ))
-
- #r.stuff = (self.needed_shares, self.total_shares,
- # len(self.found_shares), self.sharemap)
- if len(self.found_shares) < self.total_shares:
- wanted = set(range(self.total_shares))
- missing = wanted - self.found_shares
- report.append("Missing shares: %s" %
- ",".join(["sh%d" % shnum
- for shnum in sorted(missing)]))
- r.set_report(report)
- if healthy:
- r.set_summary("Healthy")
+ d1 = defer.succeed([])
+
+ # We might need to grab some elements of our block hash tree, to
+ # validate the requested block up to the share hash.
+ blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
+ # We don't need the root of the block hash tree, as that comes in the
+ # share tree.
+ blockhashesneeded.discard(0)
+ d2 = self.bucket.get_block_hashes(blockhashesneeded)
+
+ if blocknum < self.num_blocks-1:
+ thisblocksize = self.block_size
else:
- r.set_summary("Not Healthy")
- # TODO: more detail
- return r
-
-class VerifyingOutput:
- def __init__(self, total_length, results):
- self._crypttext_hasher = hashutil.crypttext_hasher()
- self.length = 0
- self.total_length = total_length
- self._segment_number = 0
- self._crypttext_hash_tree = None
- self._opened = False
- self._results = results
- results.set_healthy(False)
- results.set_recoverable(False)
- results.set_summary("Not Healthy")
-
- def got_crypttext_hash_tree(self, crypttext_hashtree):
- self._crypttext_hash_tree = crypttext_hashtree
-
- def write_segment(self, crypttext):
- self.length += len(crypttext)
-
- self._crypttext_hasher.update(crypttext)
- if self._crypttext_hash_tree:
- ch = hashutil.crypttext_segment_hasher()
- ch.update(crypttext)
- crypttext_leaves = {self._segment_number: ch.digest()}
- self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves)
-
- self._segment_number += 1
-
- def close(self):
- self.crypttext_hash = self._crypttext_hasher.digest()
-
- def finish(self):
- self._results.set_healthy(True)
- self._results.set_recoverable(True)
- self._results.set_summary("Healthy")
- # the return value of finish() is passed out of FileDownloader._done,
- # but SimpleCHKFileVerifier overrides this with the CheckerResults
- # instance instead.
-
-
-class SimpleCHKFileVerifier(download.FileDownloader):
- # this reconstructs the crypttext, which verifies that at least 'k' of
- # the shareholders are around and have valid data. It does not check the
- # remaining shareholders, and it cannot verify the plaintext.
- check_plaintext_hash = False
-
- def __init__(self, client, u, storage_index, k, N, size, ueb_hash):
- precondition(isinstance(u, CHKFileURI), u)
- self._client = client
-
- self._uri = u
- self._storage_index = storage_index
- self._uri_extension_hash = ueb_hash
- self._total_shares = N
- self._size = size
- self._num_needed_shares = k
-
- self._si_s = storage.si_b2a(self._storage_index)
- self.init_logging()
-
- self._check_results = r = CheckerResults(self._uri.to_string(), self._storage_index)
- r.set_data({"count-shares-needed": k,
- "count-shares-expected": N,
- })
- self._output = VerifyingOutput(self._size, r)
- self._paused = False
- self._stopped = False
-
- self._results = None
- self.active_buckets = {} # k: shnum, v: bucket
- self._share_buckets = [] # list of (sharenum, bucket) tuples
- self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
- self._uri_extension_sources = []
-
- self._uri_extension_data = None
-
- self._fetch_failures = {"uri_extension": 0,
- "plaintext_hashroot": 0,
- "plaintext_hashtree": 0,
- "crypttext_hashroot": 0,
- "crypttext_hashtree": 0,
- }
-
- def init_logging(self):
- self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
- num = self._client.log("SimpleCHKFileVerifier(%s): starting" % prefix)
- self._log_number = num
-
- def log(self, *args, **kwargs):
- if not "parent" in kwargs:
- kwargs['parent'] = self._log_number
- # add a prefix to the message, regardless of how it is expressed
- prefix = "SimpleCHKFileVerifier(%s): " % self._log_prefix
- if "format" in kwargs:
- kwargs["format"] = prefix + kwargs["format"]
- elif "message" in kwargs:
- kwargs["message"] = prefix + kwargs["message"]
- elif args:
- m = prefix + args[0]
- args = (m,) + args[1:]
- return self._client.log(*args, **kwargs)
+ thisblocksize = self.share_size % self.block_size
+ if thisblocksize == 0:
+ thisblocksize = self.block_size
+ d3 = self.bucket.get_block_data(blocknum,
+ self.block_size, thisblocksize)
+
+ dl = deferredutil.gatherResults([d1, d2, d3])
+ dl.addCallback(self._got_data, blocknum)
+ return dl
+
+ def _got_data(self, results, blocknum):
+ precondition(blocknum < self.num_blocks,
+ self, blocknum, self.num_blocks)
+ sharehashes, blockhashes, blockdata = results
+ try:
+ sharehashes = dict(sharehashes)
+ except ValueError, le:
+ le.args = tuple(le.args + (sharehashes,))
+ raise
+ blockhashes = dict(enumerate(blockhashes))
+
+ candidate_share_hash = None # in case we log it in the except block below
+ blockhash = None # in case we log it in the except block below
+
+ try:
+ if self.share_hash_tree.needed_hashes(self.sharenum):
+ # This will raise exception if the values being passed do not
+ # match the root node of self.share_hash_tree.
+ try:
+ self.share_hash_tree.set_hashes(sharehashes)
+ except IndexError, le:
+ # Weird -- sharehashes contained index numbers outside of
+ # the range that fit into this hash tree.
+ raise BadOrMissingHash(le)
+
+ # To validate a block we need the root of the block hash tree,
+ # which is also one of the leafs of the share hash tree, and is
+ # called "the share hash".
+ if not self.block_hash_tree[0]: # empty -- no root node yet
+ # Get the share hash from the share hash tree.
+ share_hash = self.share_hash_tree.get_leaf(self.sharenum)
+ if not share_hash:
+ # No root node in block_hash_tree and also the share hash
+ # wasn't sent by the server.
+ raise hashtree.NotEnoughHashesError
+ self.block_hash_tree.set_hashes({0: share_hash})
+
+ if self.block_hash_tree.needed_hashes(blocknum):
+ self.block_hash_tree.set_hashes(blockhashes)
+
+ blockhash = block_hash(blockdata)
+ self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
+ #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
+ # "%r .. %r: %s" %
+ # (self.sharenum, blocknum, len(blockdata),
+ # blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
+
+ except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+ # log.WEIRD: indicates undetected disk/network error, or more
+ # likely a programming error
+ self.log("hash failure in block=%d, shnum=%d on %s" %
+ (blocknum, self.sharenum, self.bucket))
+ if self.block_hash_tree.needed_hashes(blocknum):
+ self.log(""" failure occurred when checking the block_hash_tree.
+ This suggests that either the block data was bad, or that the
+ block hashes we received along with it were bad.""")
+ else:
+ self.log(""" the failure probably occurred when checking the
+ share_hash_tree, which suggests that the share hashes we
+ received from the remote peer were bad.""")
+ self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
+ self.log(" block length: %d" % len(blockdata))
+ self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
+ if len(blockdata) < 100:
+ self.log(" block data: %r" % (blockdata,))
+ else:
+ self.log(" block data start/end: %r .. %r" %
+ (blockdata[:50], blockdata[-50:]))
+ self.log(" share hash tree:\n" + self.share_hash_tree.dump())
+ self.log(" block hash tree:\n" + self.block_hash_tree.dump())
+ lines = []
+ for i,h in sorted(sharehashes.items()):
+ lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
+ self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
+ lines = []
+ for i,h in blockhashes.items():
+ lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
+ log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
+ raise BadOrMissingHash(le)
+
+ # If we made it here, the block is good. If the hash trees didn't
+ # like what they saw, they would have raised a BadHashError, causing
+ # our caller to see a Failure and thus ignore this block (as well as
+ # dropping this bucket).
+ return blockdata
+
+
+class Checker(log.PrefixingLogMixin):
+ """I query all servers to see if M uniquely-numbered shares are
+ available.
+
+ If the verify flag was passed to my constructor, then for each share I
+ download every data block and all metadata from each server and perform a
+ cryptographic integrity check on all of it. If not, I just ask each
+ server 'Which shares do you have?' and believe its answer.
+
+ In either case, I wait until I have gotten responses from all servers.
+ This fact -- that I wait -- means that an ill-behaved server which fails
+ to answer my questions will make me wait indefinitely. If it is
+ ill-behaved in a way that triggers the underlying foolscap timeouts, then
+ I will wait only as long as those foolscap timeouts, but if it is
+ ill-behaved in a way which placates the foolscap timeouts but still
+ doesn't answer my question then I will wait indefinitely.
+
+ Before I send any new request to a server, I always ask the 'monitor'
+ object that was passed into my constructor whether this task has been
+ cancelled (by invoking its raise_if_cancelled() method).
+ """
+ def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
+ monitor):
+ assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
+
+ prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
+ log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
+
+ self._verifycap = verifycap
+
+ self._monitor = monitor
+ self._servers = servers
+ self._verify = verify # bool: verify what the servers claim, or not?
+ self._add_lease = add_lease
+
+ frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
+ self._verifycap.get_storage_index())
+ self.file_renewal_secret = frs
+ fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
+ self._verifycap.get_storage_index())
+ self.file_cancel_secret = fcs
+
+ def _get_renewal_secret(self, seed):
+ return bucket_renewal_secret_hash(self.file_renewal_secret, seed)
+ def _get_cancel_secret(self, seed):
+ return bucket_cancel_secret_hash(self.file_cancel_secret, seed)
+
+ def _get_buckets(self, s, storageindex):
+ """Return a deferred that eventually fires with ({sharenum: bucket},
+ serverid, success). In case the server is disconnected or returns a
+ Failure then it fires with ({}, serverid, False) (A server
+ disconnecting or returning a Failure when we ask it for buckets is
+ the same, for our purposes, as a server that says it has none, except
+ that we want to track and report whether or not each server
+ responded.)"""
+
+ rref = s.get_rref()
+ lease_seed = s.get_lease_seed()
+ if self._add_lease:
+ renew_secret = self._get_renewal_secret(lease_seed)
+ cancel_secret = self._get_cancel_secret(lease_seed)
+ d2 = rref.callRemote("add_lease", storageindex,
+ renew_secret, cancel_secret)
+ d2.addErrback(self._add_lease_failed, s.get_name(), storageindex)
+
+ d = rref.callRemote("get_buckets", storageindex)
+ def _wrap_results(res):
+ return (res, True)
+
+ def _trap_errs(f):
+ level = log.WEIRD
+ if f.check(DeadReferenceError):
+ level = log.UNUSUAL
+ self.log("failure from server on 'get_buckets' the REMOTE failure was:",
+ facility="tahoe.immutable.checker",
+ failure=f, level=level, umid="AX7wZQ")
+ return ({}, False)
+
+ d.addCallbacks(_wrap_results, _trap_errs)
+ return d
+ def _add_lease_failed(self, f, server_name, storage_index):
+ # Older versions of Tahoe didn't handle the add-lease message very
+ # well: <=1.1.0 throws a NameError because it doesn't implement
+ # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
+ # (which is most of them, since we send add-lease to everybody,
+ # before we know whether or not they have any shares for us), and
+ # 1.2.0 throws KeyError even on known buckets due to an internal bug
+ # in the latency-measuring code.
+
+ # we want to ignore the known-harmless errors and log the others. In
+ # particular we want to log any local errors caused by coding
+ # problems.
+
+ if f.check(DeadReferenceError):
+ return
+ if f.check(RemoteException):
+ if f.value.failure.check(KeyError, IndexError, NameError):
+ # this may ignore a bit too much, but that only hurts us
+ # during debugging
+ return
+ self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
+ name=server_name,
+ f_value=str(f.value),
+ failure=f,
+ level=log.WEIRD, umid="atbAxw")
+ return
+ # local errors are cause for alarm
+ log.err(f,
+ format="local error in add_lease to [%(name)s]: %(f_value)s",
+ name=server_name,
+ f_value=str(f.value),
+ level=log.WEIRD, umid="hEGuQg")
+
+
+ def _download_and_verify(self, server, sharenum, bucket):
+ """Start an attempt to download and verify every block in this bucket
+ and return a deferred that will eventually fire once the attempt
+ completes.
+
+ If you download and verify every block then fire with (True,
+ sharenum, None), else if the share data couldn't be parsed because it
+ was of an unknown version number fire with (False, sharenum,
+ 'incompatible'), else if any of the blocks were invalid, fire with
+ (False, sharenum, 'corrupt'), else if the server disconnected (False,
+ sharenum, 'disconnect'), else if the server returned a Failure during
+ the process fire with (False, sharenum, 'failure').
+
+ If there is an internal error such as an uncaught exception in this
+ code, then the deferred will errback, but if there is a remote error
+ such as the server failing or the returned data being incorrect then
+ it will not errback -- it will fire normally with the indicated
+ results."""
+
+ vcap = self._verifycap
+ b = layout.ReadBucketProxy(bucket, server, vcap.get_storage_index())
+ veup = ValidatedExtendedURIProxy(b, vcap)
+ d = veup.start()
+
+ def _got_ueb(vup):
+ share_hash_tree = IncompleteHashTree(vcap.total_shares)
+ share_hash_tree.set_hashes({0: vup.share_root_hash})
+
+ vrbp = ValidatedReadBucketProxy(sharenum, b,
+ share_hash_tree,
+ vup.num_segments,
+ vup.block_size,
+ vup.share_size)
+
+ # note: normal download doesn't use get_all_sharehashes(),
+ # because it gets more data than necessary. We've discussed the
+ # security properties of having verification and download look
+ # identical (so the server couldn't, say, provide good responses
+ # for one and not the other), but I think that full verification
+ # is more important than defending against inconsistent server
+ # behavior. Besides, they can't pass the verifier without storing
+ # all the data, so there's not so much to be gained by behaving
+ # inconsistently.
+ d = vrbp.get_all_sharehashes()
+ # we fill share_hash_tree before fetching any blocks, so the
+ # block fetches won't send redundant share-hash-tree requests, to
+ # speed things up. Then we fetch+validate all the blockhashes.
+ d.addCallback(lambda ign: vrbp.get_all_blockhashes())
+
+ cht = IncompleteHashTree(vup.num_segments)
+ cht.set_hashes({0: vup.crypttext_root_hash})
+ d.addCallback(lambda ign: vrbp.get_all_crypttext_hashes(cht))
+
+ d.addCallback(lambda ign: vrbp)
+ return d
+ d.addCallback(_got_ueb)
+
+ def _discard_result(r):
+ assert isinstance(r, str), r
+ # to free up the RAM
+ return None
+
+ def _get_blocks(vrbp):
+ def _get_block(ign, blocknum):
+ db = vrbp.get_block(blocknum)
+ db.addCallback(_discard_result)
+ return db
+
+ dbs = defer.succeed(None)
+ for blocknum in range(veup.num_segments):
+ dbs.addCallback(_get_block, blocknum)
+
+ # The Deferred we return will fire after every block of this
+ # share has been downloaded and verified successfully, or else it
+ # will errback as soon as the first error is observed.
+ return dbs
+
+ d.addCallback(_get_blocks)
+
+ # if none of those errbacked, the blocks (and the hashes above them)
+ # are good
+ def _all_good(ign):
+ return (True, sharenum, None)
+ d.addCallback(_all_good)
+
+ # but if anything fails, we'll land here
+ def _errb(f):
+ # We didn't succeed at fetching and verifying all the blocks of
+ # this share. Handle each reason for failure differently.
+
+ if f.check(DeadReferenceError):
+ return (False, sharenum, 'disconnect')
+ elif f.check(RemoteException):
+ return (False, sharenum, 'failure')
+ elif f.check(layout.ShareVersionIncompatible):
+ return (False, sharenum, 'incompatible')
+ elif f.check(layout.LayoutInvalid,
+ layout.RidiculouslyLargeURIExtensionBlock,
+ BadOrMissingHash,
+ BadURIExtensionHashValue):
+ return (False, sharenum, 'corrupt')
+
+ # if it wasn't one of those reasons, re-raise the error
+ return f
+ d.addErrback(_errb)
- def start(self):
- log.msg("starting download [%s]" % storage.si_b2a(self._storage_index)[:5])
-
- # first step: who should we download from?
- d = defer.maybeDeferred(self._get_all_shareholders)
- d.addCallback(self._got_all_shareholders)
- # now get the uri_extension block from somebody and validate it
- d.addCallback(self._obtain_uri_extension)
- d.addCallback(self._get_crypttext_hash_tree)
- # once we know that, we can download blocks from everybody
- d.addCallback(self._download_all_segments)
- d.addCallback(self._done)
- d.addCallbacks(self._verify_done, self._verify_failed)
return d
- def _verify_done(self, ignored):
- # TODO: The following results are just stubs, and need to be replaced
- # with actual values. These exist to make things like deep-check not
- # fail.
- self._check_results.set_needs_rebalancing(False)
- N = self._total_shares
- data = {
- "count-shares-good": N,
- "count-good-share-hosts": N,
- "count-corrupt-shares": 0,
- "list-corrupt-shares": [],
- "servers-responding": [],
- "sharemap": {},
- "count-wrong-shares": 0,
- "count-recoverable-versions": 1,
- "count-unrecoverable-versions": 0,
- }
- self._check_results.set_data(data)
- return self._check_results
-
- def _verify_failed(self, ignored):
- # TODO: The following results are just stubs, and need to be replaced
- # with actual values. These exist to make things like deep-check not
- # fail.
- self._check_results.set_needs_rebalancing(False)
- N = self._total_shares
- data = {
- "count-shares-good": 0,
- "count-good-share-hosts": 0,
- "count-corrupt-shares": 0,
- "list-corrupt-shares": [],
- "servers-responding": [],
- "sharemap": {},
- "count-wrong-shares": 0,
- "count-recoverable-versions": 0,
- "count-unrecoverable-versions": 1,
- }
- self._check_results.set_data(data)
- return self._check_results
+ def _verify_server_shares(self, s):
+ """ Return a deferred which eventually fires with a tuple of
+ (set(sharenum), server, set(corruptsharenum),
+ set(incompatiblesharenum), success) showing all the shares verified
+ to be served by this server, and all the corrupt shares served by the
+ server, and all the incompatible shares served by the server. In case
+ the server is disconnected or returns a Failure then it fires with
+ the last element False.
+
+ A server disconnecting or returning a failure when we ask it for
+ shares is the same, for our purposes, as a server that says it has
+ none or offers invalid ones, except that we want to track and report
+ the server's behavior. Similarly, the presence of corrupt shares is
+ mainly of use for diagnostics -- you can typically treat it as just
+ like being no share at all by just observing its absence from the
+ verified shares dict and ignoring its presence in the corrupt shares
+ dict.
+
+ The 'success' argument means whether the server responded to *any*
+ queries during this process, so if it responded to some queries and
+ then disconnected and ceased responding, or returned a failure, it is
+ still marked with the True flag for 'success'.
+ """
+ d = self._get_buckets(s, self._verifycap.get_storage_index())
+
+ def _got_buckets(result):
+ bucketdict, success = result
+
+ shareverds = []
+ for (sharenum, bucket) in bucketdict.items():
+ d = self._download_and_verify(s, sharenum, bucket)
+ shareverds.append(d)
+
+ dl = deferredutil.gatherResults(shareverds)
+
+ def collect(results):
+ verified = set()
+ corrupt = set()
+ incompatible = set()
+ for succ, sharenum, whynot in results:
+ if succ:
+ verified.add(sharenum)
+ else:
+ if whynot == 'corrupt':
+ corrupt.add(sharenum)
+ elif whynot == 'incompatible':
+ incompatible.add(sharenum)
+ return (verified, s, corrupt, incompatible, success)
+
+ dl.addCallback(collect)
+ return dl
+
+ def _err(f):
+ f.trap(RemoteException, DeadReferenceError)
+ return (set(), s, set(), set(), False)
+
+ d.addCallbacks(_got_buckets, _err)
+ return d
+
+ def _check_server_shares(self, s):
+ """Return a deferred which eventually fires with a tuple of
+ (set(sharenum), server, set(), set(), responded) showing all the
+ shares claimed to be served by this server. In case the server is
+ disconnected then it fires with (set(), server, set(), set(), False)
+ (a server disconnecting when we ask it for buckets is the same, for
+ our purposes, as a server that says it has none, except that we want
+ to track and report whether or not each server responded.)"""
+ def _curry_empty_corrupted(res):
+ buckets, responded = res
+ return (set(buckets), s, set(), set(), responded)
+ d = self._get_buckets(s, self._verifycap.get_storage_index())
+ d.addCallback(_curry_empty_corrupted)
+ return d
+
+ def _format_results(self, results):
+ SI = self._verifycap.get_storage_index()
+
+ verifiedshares = dictutil.DictOfSets() # {sharenum: set(server)}
+ servers = {} # {server: set(sharenums)}
+ corruptshare_locators = [] # (server, storageindex, sharenum)
+ incompatibleshare_locators = [] # (server, storageindex, sharenum)
+ servers_responding = set() # server
+
+ for verified, server, corrupt, incompatible, responded in results:
+ servers.setdefault(server, set()).update(verified)
+ for sharenum in verified:
+ verifiedshares.setdefault(sharenum, set()).add(server)
+ for sharenum in corrupt:
+ corruptshare_locators.append((server, SI, sharenum))
+ for sharenum in incompatible:
+ incompatibleshare_locators.append((server, SI, sharenum))
+ if responded:
+ servers_responding.add(server)
+
+ good_share_hosts = len([s for s in servers.keys() if servers[s]])
+
+ assert len(verifiedshares) <= self._verifycap.total_shares, (verifiedshares.keys(), self._verifycap.total_shares)
+ if len(verifiedshares) == self._verifycap.total_shares:
+ healthy = True
+ summary = "Healthy"
+ else:
+ healthy = False
+ summary = ("Not Healthy: %d shares (enc %d-of-%d)" %
+ (len(verifiedshares),
+ self._verifycap.needed_shares,
+ self._verifycap.total_shares))
+ if len(verifiedshares) >= self._verifycap.needed_shares:
+ recoverable = 1
+ unrecoverable = 0
+ else:
+ recoverable = 0
+ unrecoverable = 1
+
+ # The file needs rebalancing if the set of servers that have at least
+ # one share is less than the number of uniquely-numbered shares
+ # available.
+ # TODO: this may be wrong, see ticket #1115 comment:27 and ticket #1784.
+ needs_rebalancing = bool(good_share_hosts < len(verifiedshares))
+
+ cr = CheckResults(self._verifycap, SI,
+ healthy=healthy, recoverable=bool(recoverable),
+ needs_rebalancing=needs_rebalancing,
+ count_shares_needed=self._verifycap.needed_shares,
+ count_shares_expected=self._verifycap.total_shares,
+ count_shares_good=len(verifiedshares),
+ count_good_share_hosts=good_share_hosts,
+ count_recoverable_versions=recoverable,
+ count_unrecoverable_versions=unrecoverable,
+ servers_responding=list(servers_responding),
+ sharemap=verifiedshares,
+ count_wrong_shares=0, # no such thing, for immutable
+ list_corrupt_shares=corruptshare_locators,
+ count_corrupt_shares=len(corruptshare_locators),
+ list_incompatible_shares=incompatibleshare_locators,
+ count_incompatible_shares=len(incompatibleshare_locators),
+ summary=summary,
+ report=[],
+ share_problems=[],
+ servermap=None)
+
+ return cr
+
+ def start(self):
+ ds = []
+ if self._verify:
+ for s in self._servers:
+ ds.append(self._verify_server_shares(s))
+ else:
+ for s in self._servers:
+ ds.append(self._check_server_shares(s))
+
+ return deferredutil.gatherResults(ds).addCallback(self._format_results)