--- /dev/null
+
+import struct
+from twisted.internet import defer
+from twisted.python import failure
+from allmydata import hashtree
+from allmydata.util import hashutil
+
+from common import MODE_CHECK, CorruptShareError
+from servermap import ServerMap, ServermapUpdater
+from layout import unpack_share, SIGNED_PREFIX
+
+class MutableChecker:
+
+ def __init__(self, node):
+ self._node = node
+ self.healthy = True
+ self.problems = []
+ self._storage_index = self._node.get_storage_index()
+
+ def check(self, verify=False, repair=False):
+ servermap = ServerMap()
+ self.do_verify = verify
+ self.do_repair = repair
+ u = ServermapUpdater(self._node, servermap, MODE_CHECK)
+ d = u.update()
+ d.addCallback(self._got_mapupdate_results)
+ if verify:
+ d.addCallback(self._verify_all_shares)
+ d.addCallback(self._maybe_do_repair)
+ d.addCallback(self._return_results)
+ return d
+
+ def _got_mapupdate_results(self, servermap):
+ # the file is healthy if there is exactly one recoverable version, it
+ # has at least N distinct shares, and there are no unrecoverable
+ # versions: all existing shares will be for the same version.
+ self.best_version = None
+ if servermap.unrecoverable_versions():
+ self.healthy = False
+ num_recoverable = len(servermap.recoverable_versions())
+ if num_recoverable == 0:
+ self.healthy = False
+ else:
+ if num_recoverable > 1:
+ self.healthy = False
+ self.best_version = servermap.best_recoverable_version()
+ available_shares = servermap.shares_available()
+ (num_distinct_shares, k, N) = available_shares[self.best_version]
+ if num_distinct_shares < N:
+ self.healthy = False
+
+ return servermap
+
+ def _verify_all_shares(self, servermap):
+ # read every byte of each share
+ if not self.best_version:
+ return
+ versionmap = servermap.make_versionmap()
+ shares = versionmap[self.best_version]
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = self.best_version
+ offsets = dict(offsets_tuple)
+ readv = [ (0, offsets["EOF"]) ]
+ dl = []
+ for (shnum, peerid, timestamp) in shares:
+ ss = servermap.connections[peerid]
+ d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
+ d.addCallback(self._got_answer, peerid)
+ dl.append(d)
+ return defer.DeferredList(dl, fireOnOneErrback=True)
+
+ def _do_read(self, ss, peerid, storage_index, shnums, readv):
+ # isolate the callRemote to a separate method, so tests can subclass
+ # Publish and override it
+ d = ss.callRemote("slot_readv", storage_index, shnums, readv)
+ return d
+
+ def _got_answer(self, datavs, peerid):
+ for shnum,datav in datavs.items():
+ data = datav[0]
+ try:
+ self._got_results_one_share(shnum, peerid, data)
+ except CorruptShareError:
+ f = failure.Failure()
+ self.add_problem(shnum, peerid, f)
+
+ def check_prefix(self, peerid, shnum, data):
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = self.best_version
+ got_prefix = data[:struct.calcsize(SIGNED_PREFIX)]
+ if got_prefix != prefix:
+ raise CorruptShareError(peerid, shnum,
+ "prefix mismatch: share changed while we were reading it")
+
+ def _got_results_one_share(self, shnum, peerid, data):
+ self.check_prefix(peerid, shnum, data)
+
+ # the [seqnum:signature] pieces are validated by _compare_prefix,
+ # which checks their signature against the pubkey known to be
+ # associated with this file.
+
+ (seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature,
+ share_hash_chain, block_hash_tree, share_data,
+ enc_privkey) = unpack_share(data)
+
+ # validate [share_hash_chain,block_hash_tree,share_data]
+
+ leaves = [hashutil.block_hash(share_data)]
+ t = hashtree.HashTree(leaves)
+ if list(t) != block_hash_tree:
+ raise CorruptShareError(peerid, shnum, "block hash tree failure")
+ share_hash_leaf = t[0]
+ t2 = hashtree.IncompleteHashTree(N)
+ # root_hash was checked by the signature
+ t2.set_hashes({0: root_hash})
+ try:
+ t2.set_hashes(hashes=share_hash_chain,
+ leaves={shnum: share_hash_leaf})
+ except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
+ IndexError), e:
+ msg = "corrupt hashes: %s" % (e,)
+ raise CorruptShareError(peerid, shnum, msg)
+
+ # validate enc_privkey: only possible if we have a write-cap
+ if not self._node.is_readonly():
+ alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
+ alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
+ if alleged_writekey != self._node.get_writekey():
+ raise CorruptShareError(peerid, shnum, "invalid privkey")
+
+ def _maybe_do_repair(self, res):
+ if self.healthy:
+ return
+ if not self.do_repair:
+ return
+ pass
+
+ def _return_results(self, res):
+ r = {}
+ r['healthy'] = self.healthy
+ r['problems'] = self.problems
+ return r
+
+
+ def add_problem(self, shnum, peerid, what):
+ self.healthy = False
+ self.problems.append( (peerid, self._storage_index, shnum, what) )
+
from allmydata.mutable.common import DictOfSets, ResponseCache, \
MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
- NotEnoughServersError
+ NotEnoughServersError, CorruptShareError
from allmydata.mutable.retrieve import Retrieve
from allmydata.mutable.publish import Publish
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
chr(ord(original[byte_offset]) ^ 0x01) +
original[byte_offset+1:])
-def corrupt(res, s, offset, shnums_to_corrupt=None):
+def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
# if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
# list of shnums to corrupt.
for peerid in s._peers:
real_offset = o[offset1]
else:
real_offset = offset1
- real_offset = int(real_offset) + offset2
+ real_offset = int(real_offset) + offset2 + offset_offset
assert isinstance(real_offset, int), offset
shares[shnum] = flip_bit(data, real_offset)
return res
return d
+class CheckerMixin:
+ def check_good(self, r, where):
+ self.failUnless(r['healthy'], where)
+ self.failIf(r['problems'], where)
+ return r
+
+ def check_bad(self, r, where):
+ self.failIf(r['healthy'], where)
+ return r
+
+ def check_expected_failure(self, r, expected_exception, substring, where):
+ for (peerid, storage_index, shnum, f) in r['problems']:
+ if f.check(expected_exception):
+ self.failUnless(substring in str(f),
+ "%s: substring '%s' not in '%s'" %
+ (where, substring, str(f)))
+ return
+ self.fail("%s: didn't see expected exception %s in problems %s" %
+ (where, expected_exception, r['problems']))
+
+
+class Checker(unittest.TestCase, CheckerMixin):
+ def setUp(self):
+ # publish a file and create shares, which can then be manipulated
+ # later.
+ self.CONTENTS = "New contents go here" * 1000
+ num_peers = 20
+ self._client = FakeClient(num_peers)
+ self._storage = self._client._storage
+ d = self._client.create_mutable_file(self.CONTENTS)
+ def _created(node):
+ self._fn = node
+ d.addCallback(_created)
+ return d
+
+
+ def test_check_good(self):
+ d = self._fn.check()
+ d.addCallback(self.check_good, "test_check_good")
+ return d
+
+ def test_check_no_shares(self):
+ for shares in self._storage._peers.values():
+ shares.clear()
+ d = self._fn.check()
+ d.addCallback(self.check_bad, "test_check_no_shares")
+ return d
+
+ def test_check_not_enough_shares(self):
+ for shares in self._storage._peers.values():
+ for shnum in shares.keys():
+ if shnum > 0:
+ del shares[shnum]
+ d = self._fn.check()
+ d.addCallback(self.check_bad, "test_check_not_enough_shares")
+ return d
+
+ def test_check_all_bad_sig(self):
+ corrupt(None, self._storage, 1) # bad sig
+ d = self._fn.check()
+ d.addCallback(self.check_bad, "test_check_all_bad_sig")
+ return d
+
+ def test_check_all_bad_blocks(self):
+ corrupt(None, self._storage, "share_data", [9]) # bad blocks
+ # the Checker won't notice this.. it doesn't look at actual data
+ d = self._fn.check()
+ d.addCallback(self.check_good, "test_check_all_bad_blocks")
+ return d
+
+ def test_verify_good(self):
+ d = self._fn.check(verify=True)
+ d.addCallback(self.check_good, "test_verify_good")
+ return d
+
+ def test_verify_all_bad_sig(self):
+ corrupt(None, self._storage, 1) # bad sig
+ d = self._fn.check(verify=True)
+ d.addCallback(self.check_bad, "test_verify_all_bad_sig")
+ return d
+
+ def test_verify_one_bad_sig(self):
+ corrupt(None, self._storage, 1, [9]) # bad sig
+ d = self._fn.check(verify=True)
+ d.addCallback(self.check_bad, "test_verify_one_bad_sig")
+ return d
+
+ def test_verify_one_bad_block(self):
+ corrupt(None, self._storage, "share_data", [9]) # bad blocks
+ # the Verifier *will* notice this, since it examines every byte
+ d = self._fn.check(verify=True)
+ d.addCallback(self.check_bad, "test_verify_one_bad_block")
+ d.addCallback(self.check_expected_failure,
+ CorruptShareError, "block hash tree failure",
+ "test_verify_one_bad_block")
+ return d
+
+ def test_verify_one_bad_sharehash(self):
+ corrupt(None, self._storage, "share_hash_chain", [9], 5)
+ d = self._fn.check(verify=True)
+ d.addCallback(self.check_bad, "test_verify_one_bad_sharehash")
+ d.addCallback(self.check_expected_failure,
+ CorruptShareError, "corrupt hashes",
+ "test_verify_one_bad_sharehash")
+ return d
+
+ def test_verify_one_bad_encprivkey(self):
+ corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
+ d = self._fn.check(verify=True)
+ d.addCallback(self.check_bad, "test_verify_one_bad_encprivkey")
+ d.addCallback(self.check_expected_failure,
+ CorruptShareError, "invalid privkey",
+ "test_verify_one_bad_encprivkey")
+ return d
+
+ def test_verify_one_bad_encprivkey_uncheckable(self):
+ corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
+ readonly_fn = self._fn.get_readonly()
+ # a read-only node has no way to validate the privkey
+ d = readonly_fn.check(verify=True)
+ d.addCallback(self.check_good,
+ "test_verify_one_bad_encprivkey_uncheckable")
+ return d
+
+
class MultipleEncodings(unittest.TestCase):
def setUp(self):
self.CONTENTS = "New contents go here"
d.addCallback(_retrieved)
return d
-class MultipleVersions(unittest.TestCase):
+class MultipleVersions(unittest.TestCase, CheckerMixin):
def setUp(self):
self.CONTENTS = ["Contents 0",
"Contents 1",
self._set_versions(dict([(i,2) for i in (0,2,4,6,8)]))
d = self._fn.download_best_version()
d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[4]))
+ # and the checker should report problems
+ d.addCallback(lambda res: self._fn.check())
+ d.addCallback(self.check_bad, "test_multiple_versions")
+
# but if everything is at version 2, that's what we should download
d.addCallback(lambda res:
self._set_versions(dict([(i,2) for i in range(10)])))