-from twisted.internet import defer
-from twisted.python import failure
-from allmydata import hashtree
from allmydata.uri import from_string
-from allmydata.util import hashutil, base32, idlib, log
+from allmydata.util import base32, idlib, log
from allmydata.check_results import CheckAndRepairResults, CheckResults
from allmydata.mutable.common import MODE_CHECK, CorruptShareError
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
-from allmydata.mutable.layout import unpack_share, SIGNED_PREFIX_LENGTH
+from allmydata.mutable.retrieve import Retrieve # for verifying
class MutableChecker:
def check(self, verify=False, add_lease=False):
servermap = ServerMap()
+ # Updating the servermap in MODE_CHECK will stand a good chance
+ # of finding all of the shares, and getting a good idea of
+ # recoverability, etc, without verifying.
u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
servermap, MODE_CHECK, add_lease=add_lease)
if self._history:
if num_recoverable:
self.best_version = servermap.best_recoverable_version()
+ # The file is unhealthy and needs to be repaired if:
+ # - There are unrecoverable versions.
if servermap.unrecoverable_versions():
self.need_repair = True
+ # - There isn't a recoverable version.
if num_recoverable != 1:
self.need_repair = True
+ # - The best recoverable version is missing some shares.
if self.best_version:
available_shares = servermap.shares_available()
(num_distinct_shares, k, N) = available_shares[self.best_version]
def _verify_all_shares(self, servermap):
# read every byte of each share
+ #
+ # This logic is going to be very nearly the same as the
+ # downloader. I bet we could pass the downloader a flag that
+ # makes it do this, and piggyback onto that instead of
+ # duplicating a bunch of code.
+ #
+ # Like:
+ # r = Retrieve(blah, blah, blah, verify=True)
+ # d = r.download()
+ # (wait, wait, wait, d.callback)
+ #
+ # Then, when it has finished, we can check the servermap (which
+ # we provided to Retrieve) to figure out which shares are bad,
+ # since the Retrieve process will have updated the servermap as
+ # it went along.
+ #
+ # By passing the verify=True flag to the constructor, we are
+ # telling the downloader a few things.
+ #
+ # 1. It needs to download all N shares, not just K shares.
+ # 2. It doesn't need to decrypt or decode the shares, only
+ # verify them.
if not self.best_version:
return
- versionmap = servermap.make_versionmap()
- shares = versionmap[self.best_version]
- (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
- offsets_tuple) = self.best_version
- offsets = dict(offsets_tuple)
- readv = [ (0, offsets["EOF"]) ]
- dl = []
- for (shnum, peerid, timestamp) in shares:
- ss = servermap.connections[peerid]
- d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
- d.addCallback(self._got_answer, peerid, servermap)
- dl.append(d)
- return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
- def _do_read(self, ss, peerid, storage_index, shnums, readv):
- # isolate the callRemote to a separate method, so tests can subclass
- # Publish and override it
- d = ss.callRemote("slot_readv", storage_index, shnums, readv)
+ r = Retrieve(self._node, servermap, self.best_version, verify=True)
+ d = r.download()
+ d.addCallback(self._process_bad_shares)
return d
- def _got_answer(self, datavs, peerid, servermap):
- for shnum,datav in datavs.items():
- data = datav[0]
- try:
- self._got_results_one_share(shnum, peerid, data)
- except CorruptShareError:
- f = failure.Failure()
- self.need_repair = True
- self.bad_shares.append( (peerid, shnum, f) )
- prefix = data[:SIGNED_PREFIX_LENGTH]
- servermap.mark_bad_share(peerid, shnum, prefix)
- ss = servermap.connections[peerid]
- self.notify_server_corruption(ss, shnum, str(f.value))
-
- def check_prefix(self, peerid, shnum, data):
- (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
- offsets_tuple) = self.best_version
- got_prefix = data[:SIGNED_PREFIX_LENGTH]
- if got_prefix != prefix:
- raise CorruptShareError(peerid, shnum,
- "prefix mismatch: share changed while we were reading it")
-
- def _got_results_one_share(self, shnum, peerid, data):
- self.check_prefix(peerid, shnum, data)
-
- # the [seqnum:signature] pieces are validated by _compare_prefix,
- # which checks their signature against the pubkey known to be
- # associated with this file.
- (seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature,
- share_hash_chain, block_hash_tree, share_data,
- enc_privkey) = unpack_share(data)
-
- # validate [share_hash_chain,block_hash_tree,share_data]
-
- leaves = [hashutil.block_hash(share_data)]
- t = hashtree.HashTree(leaves)
- if list(t) != block_hash_tree:
- raise CorruptShareError(peerid, shnum, "block hash tree failure")
- share_hash_leaf = t[0]
- t2 = hashtree.IncompleteHashTree(N)
- # root_hash was checked by the signature
- t2.set_hashes({0: root_hash})
- try:
- t2.set_hashes(hashes=share_hash_chain,
- leaves={shnum: share_hash_leaf})
- except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
- IndexError), e:
- msg = "corrupt hashes: %s" % (e,)
- raise CorruptShareError(peerid, shnum, msg)
-
- # validate enc_privkey: only possible if we have a write-cap
- if not self._node.is_readonly():
- alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
- alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
- if alleged_writekey != self._node.get_writekey():
- raise CorruptShareError(peerid, shnum, "invalid privkey")
+ def _process_bad_shares(self, bad_shares):
+ if bad_shares:
+ self.need_repair = True
+ self.bad_shares = bad_shares
- def notify_server_corruption(self, ss, shnum, reason):
- ss.callRemoteOnly("advise_corrupt_share",
- "mutable", self._storage_index, shnum, reason)
def _count_shares(self, smap, version):
available_shares = smap.shares_available()